diff --git a/core/src/lib.rs b/core/src/lib.rs index 122823f2d..c530aefbd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -50,6 +50,7 @@ pub mod rpc_subscriptions; pub mod sendmmsg; pub mod service; pub mod sigverify; +pub mod sigverify_shreds; pub mod sigverify_stage; pub mod snapshot_packager_service; pub mod storage_stage; diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 3a7751784..09d391a0f 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -24,6 +24,7 @@ use std::cell::RefCell; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) + .thread_name(|ix| format!("sigverify_{}", ix)) .build() .unwrap())); @@ -114,7 +115,7 @@ fn verify_packet(packet: &Packet) -> u8 { 1 } -fn batch_size(batches: &[Packets]) -> usize { +pub fn batch_size(batches: &[Packets]) -> usize { batches.iter().map(|p| p.packets.len()).sum() } @@ -271,6 +272,29 @@ pub fn ed25519_verify_disabled(batches: &[Packets]) -> Vec> { rv } +pub fn copy_return_values(sig_lens: &[Vec], out: &PinnedVec, rvs: &mut Vec>) { + let mut num = 0; + for (vs, sig_vs) in rvs.iter_mut().zip(sig_lens.iter()) { + for (v, sig_v) in vs.iter_mut().zip(sig_vs.iter()) { + if *sig_v == 0 { + *v = 0; + } else { + let mut vout = 1; + for _ in 0..*sig_v { + if 0 == out[num] { + vout = 0; + } + num += 1; + } + *v = vout; + } + if *v != 0 { + trace!("VERIFIED PACKET!!!!!"); + } + } + } +} + pub fn ed25519_verify( batches: &[Packets], recycler: &Recycler, @@ -340,26 +364,7 @@ pub fn ed25519_verify( } } trace!("done verify"); - let mut num = 0; - for (vs, sig_vs) in rvs.iter_mut().zip(sig_lens.iter()) { - for (v, sig_v) in vs.iter_mut().zip(sig_vs.iter()) { - if *sig_v == 0 { - *v = 0; - } else { - let mut vout = 1; - for _ in 0..*sig_v { - if 0 == out[num] { - vout = 0; - } - num += 1; - } - *v = vout; - } - if *v != 0 { - trace!("VERIFIED PACKET!!!!!"); - } - } - } + copy_return_values(&sig_lens, &out, &mut rvs); inc_new_counter_debug!("ed25519_verify_gpu", count); recycler_out.recycle(out); recycler.recycle(signature_offsets); diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs new file mode 100644 index 000000000..c0d80905b --- /dev/null +++ b/core/src/sigverify_shreds.rs @@ -0,0 +1,408 @@ +#![allow(clippy::implicit_hasher)] +use crate::cuda_runtime::PinnedVec; +use crate::packet::{Packet, Packets}; +use crate::recycler::Recycler; +use crate::recycler::Reset; +use crate::sigverify::{self, TxOffset}; +use bincode::deserialize; +use rayon::iter::IntoParallelIterator; +use rayon::iter::ParallelIterator; +use rayon::ThreadPool; +use solana_ledger::perf_libs; +use solana_ledger::shred::ShredType; +use solana_metrics::inc_new_counter_debug; +use solana_rayon_threadlimit::get_thread_count; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use std::collections::HashMap; +use std::mem::size_of; + +use std::cell::RefCell; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("sigverify_shreds_{}", ix)) + .build() + .unwrap())); + +impl Reset for PinnedVec { + fn reset(&mut self) { + self.resize(0, Pubkey::default()); + } +} + +/// Assuming layout is +/// signature: Signature +/// signed_msg: { +/// type: ShredType +/// slot: u64, +/// ... +/// } +/// Signature is the first thing in the packet, and slot is the first thing in the signed message. +fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { + let sig_start = 0; + let sig_end = size_of::(); + let slot_start = sig_end + size_of::(); + let slot_end = slot_start + size_of::(); + let msg_start = sig_end; + let msg_end = packet.meta.size; + trace!("slot start and end {} {}", slot_start, slot_end); + if packet.meta.size < slot_end { + return Some(0); + } + let slot: u64 = deserialize(&packet.data[slot_start..slot_end]).ok()?; + trace!("slot {}", slot); + let pubkey: &Pubkey = slot_leaders.get(&slot)?; + if packet.meta.size < sig_end { + return Some(0); + } + let signature = Signature::new(&packet.data[sig_start..sig_end]); + trace!("signature {}", signature); + if !signature.verify(pubkey.as_ref(), &packet.data[msg_start..msg_end]) { + return Some(0); + } + Some(1) +} + +pub fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { + use rayon::prelude::*; + let count = sigverify::batch_size(batches); + debug!("CPU SHRED ECDSA for {}", count); + let rv = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .map(|p| { + p.packets + .par_iter() + .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) + .collect() + }) + .collect() + }) + }); + inc_new_counter_debug!("ed25519_shred_verify_cpu", count); + rv +} + +fn shred_gpu_pubkeys( + batches: &[Packets], + slot_leaders: &HashMap, + recycler_offsets: &Recycler, + recycler_pubkeys: &Recycler>, +) -> (PinnedVec, TxOffset, usize) { + //TODO: mark Pubkey::default shreds as failed after the GPU returns + assert_eq!(slot_leaders.get(&std::u64::MAX), Some(&Pubkey::default())); + let slots: Vec> = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .map(|p| { + p.packets + .iter() + .map(|packet| { + let slot_start = size_of::() + size_of::(); + let slot_end = slot_start + size_of::(); + if packet.meta.size < slot_end { + return std::u64::MAX; + } + let slot: Option = + deserialize(&packet.data[slot_start..slot_end]).ok(); + match slot { + Some(slot) if slot_leaders.get(&slot).is_some() => slot, + _ => std::u64::MAX, + } + }) + .collect() + }) + .collect() + }) + }); + let mut keys_to_slots: HashMap> = HashMap::new(); + for batch in slots.iter() { + for slot in batch.iter() { + let key = slot_leaders.get(slot).unwrap(); + keys_to_slots + .entry(*key) + .or_insert_with(|| vec![]) + .push(*slot); + } + } + let mut pubkeys: PinnedVec = recycler_pubkeys.allocate("shred_gpu_pubkeys"); + let mut slot_to_key_ix = HashMap::new(); + for (i, (k, slots)) in keys_to_slots.iter().enumerate() { + pubkeys.push(*k); + for s in slots { + slot_to_key_ix.insert(s, i); + } + } + let mut offsets = recycler_offsets.allocate("shred_offsets"); + slots.iter().for_each(|packet_slots| { + packet_slots.iter().for_each(|slot| { + offsets.push((slot_to_key_ix.get(slot).unwrap() * size_of::()) as u32); + }); + }); + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data + //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems + let num_in_packets = + (pubkeys.len() * size_of::() + (size_of::() - 1)) / size_of::(); + trace!("num_in_packets {}", num_in_packets); + //number of bytes missing + let missing = num_in_packets * size_of::() - pubkeys.len() * size_of::(); + trace!("missing {}", missing); + //extra Pubkeys needed to fill the buffer + let extra = (missing + size_of::() - 1) / size_of::(); + trace!("extra {}", extra); + trace!("pubkeys {}", pubkeys.len()); + for _ in 0..extra { + pubkeys.push(Pubkey::default()); + trace!("pubkeys {}", pubkeys.len()); + } + trace!("pubkeys {:?}", pubkeys); + trace!("offsets {:?}", offsets); + (pubkeys, offsets, num_in_packets) +} + +fn shred_gpu_offsets( + mut pubkeys_end: u32, + batches: &[Packets], + recycler_offsets: &Recycler, +) -> (TxOffset, TxOffset, TxOffset, Vec>) { + let mut signature_offsets = recycler_offsets.allocate("shred_signatures"); + let mut msg_start_offsets = recycler_offsets.allocate("shred_msg_starts"); + let mut msg_sizes = recycler_offsets.allocate("shred_msg_sizes"); + let mut v_sig_lens = vec![]; + for batch in batches { + let mut sig_lens = Vec::new(); + for packet in &batch.packets { + let sig_start = pubkeys_end; + let sig_end = sig_start + size_of::() as u32; + let msg_start = sig_end; + let msg_end = sig_start + packet.meta.size as u32; + signature_offsets.push(sig_start); + msg_start_offsets.push(msg_start); + let msg_size = if msg_end < msg_start { + 0 + } else { + msg_end - msg_start + }; + msg_sizes.push(msg_size); + sig_lens.push(1); + pubkeys_end += size_of::() as u32; + } + v_sig_lens.push(sig_lens); + } + (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) +} + +pub fn verify_shreds_gpu( + batches: &[Packets], + slot_leaders: &HashMap, + recycler_offsets: &Recycler, + recycler_pubkeys: &Recycler>, + recycler_out: &Recycler>, +) -> Vec> { + let api = perf_libs::api(); + if api.is_none() { + return verify_shreds_cpu(batches, slot_leaders); + } + let api = api.unwrap(); + + let mut elems = Vec::new(); + let mut rvs = Vec::new(); + let count = sigverify::batch_size(batches); + let (pubkeys, pubkey_offsets, mut num_packets) = + shred_gpu_pubkeys(batches, slot_leaders, recycler_offsets, recycler_pubkeys); + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data + let pubkeys_len = (num_packets * size_of::()) as u32; + trace!("num_packets: {}", num_packets); + trace!("pubkeys_len: {}", pubkeys_len); + let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = + shred_gpu_offsets(pubkeys_len, batches, recycler_offsets); + let mut out = recycler_out.allocate("out_buffer"); + out.set_pinnable(); + elems.push( + perf_libs::Elems { + #![allow(clippy::cast_ptr_alignment)] + elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, + num: num_packets as u32, + }, + ); + + for p in batches { + elems.push(perf_libs::Elems { + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, + }); + let mut v = Vec::new(); + v.resize(p.packets.len(), 0); + rvs.push(v); + num_packets += p.packets.len(); + } + out.resize(signature_offsets.len(), 0); + + trace!("Starting verify num packets: {}", num_packets); + trace!("elem len: {}", elems.len() as u32); + trace!("packet sizeof: {}", size_of::() as u32); + const USE_NON_DEFAULT_STREAM: u8 = 1; + unsafe { + let res = (api.ed25519_verify_many)( + elems.as_ptr(), + elems.len() as u32, + size_of::() as u32, + num_packets as u32, + signature_offsets.len() as u32, + msg_sizes.as_ptr(), + pubkey_offsets.as_ptr(), + signature_offsets.as_ptr(), + msg_start_offsets.as_ptr(), + out.as_mut_ptr(), + USE_NON_DEFAULT_STREAM, + ); + if res != 0 { + trace!("RETURN!!!: {}", res); + } + } + trace!("done verify"); + trace!("out buf {:?}", out); + + sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs); + + inc_new_counter_debug!("ed25519_shred_verify_gpu", count); + recycler_out.recycle(out); + recycler_offsets.recycle(signature_offsets); + recycler_offsets.recycle(pubkey_offsets); + recycler_offsets.recycle(msg_sizes); + recycler_offsets.recycle(msg_start_offsets); + recycler_pubkeys.recycle(pubkeys); + rvs +} + +#[cfg(test)] +pub mod tests { + use super::*; + use solana_ledger::shred::{Shred, Shredder}; + use solana_sdk::signature::{Keypair, KeypairUtil}; + #[test] + fn test_sigverify_shred_cpu() { + solana_logger::setup(); + let mut packet = Packet::default(); + let slot = 0xdeadc0de; + let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + assert_eq!(shred.slot(), slot); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + trace!("signature {}", shred.common_header.signature); + packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload); + packet.meta.size = shred.payload.len(); + + let mut leader_slots: HashMap = HashMap::new(); + leader_slots.insert(slot, keypair.pubkey()); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(1)); + + let wrong_keypair = Keypair::new(); + leader_slots.insert(slot, wrong_keypair.pubkey()); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(0)); + + leader_slots.remove(&slot); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, None); + } + + #[test] + fn test_sigverify_shreds_cpu() { + solana_logger::setup(); + let mut batch = [Packets::default()]; + let slot = 0xdeadc0de; + let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + + let mut leader_slots: HashMap = HashMap::new(); + leader_slots.insert(slot, keypair.pubkey()); + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![1]]); + + let wrong_keypair = Keypair::new(); + leader_slots.insert(slot, wrong_keypair.pubkey()); + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![0]]); + + leader_slots.remove(&slot); + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![0]]); + + leader_slots.insert(slot, keypair.pubkey()); + batch[0].packets[0].meta.size = 0; + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![0]]); + } + + #[test] + fn test_sigverify_shreds_gpu() { + solana_logger::setup(); + let recycler_offsets = Recycler::default(); + let recycler_pubkeys = Recycler::default(); + let recycler_out = Recycler::default(); + let mut leader_slots: HashMap = HashMap::new(); + leader_slots.insert(std::u64::MAX, Pubkey::default()); + + let mut batch = [Packets::default()]; + let slot = 0xdeadc0de; + let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + + leader_slots.insert(slot, keypair.pubkey()); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![1]]); + + let wrong_keypair = Keypair::new(); + leader_slots.insert(slot, wrong_keypair.pubkey()); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + + leader_slots.remove(&slot); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + + batch[0].packets[0].meta.size = 0; + leader_slots.insert(slot, keypair.pubkey()); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + } +} diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index df3bfdaee..539200c07 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -442,7 +442,7 @@ impl Shredder { (data_shreds, coding_shreds, last_shred_index + 1) } - pub fn sign_shred(signer: &Arc, shred: &mut Shred) { + pub fn sign_shred(signer: &Keypair, shred: &mut Shred) { let signature = signer.sign_message(&shred.payload[SIZE_OF_SIGNATURE..]); bincode::serialize_into(&mut shred.payload[..SIZE_OF_SIGNATURE], &signature) .expect("Failed to generate serialized signature");