From e082418e4a6cc3dc0ecba879cc38e9d21f85f3d9 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Thu, 12 Dec 2019 13:27:33 -0800 Subject: [PATCH] Cleanup the sign shreds interface (#7456) automerge --- ledger/Cargo.toml | 3 + ledger/benches/sigverify_shreds.rs | 76 ++++++++++ ledger/src/shred.rs | 7 + ledger/src/sigverify_shreds.rs | 227 +++++++++++++---------------- 4 files changed, 191 insertions(+), 122 deletions(-) create mode 100644 ledger/benches/sigverify_shreds.rs diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 35039d08f..6f5141b4d 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -62,3 +62,6 @@ solana-budget-program = { path = "../programs/budget", version = "0.22.0" } [lib] crate-type = ["lib"] name = "solana_ledger" + +[[bench]] +name = "sigverify_shreds" diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs new file mode 100644 index 000000000..ea0c3378f --- /dev/null +++ b/ledger/benches/sigverify_shreds.rs @@ -0,0 +1,76 @@ +#![feature(test)] + +extern crate test; +use solana_ledger::shred::Shred; +use solana_ledger::shred::SIZE_OF_DATA_SHRED_PAYLOAD; +use solana_ledger::sigverify_shreds::{ + sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair, +}; +use solana_perf::packet::{Packet, Packets}; +use solana_perf::recycler_cache::RecyclerCache; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use std::sync::Arc; +use test::Bencher; + +const NUM_PACKETS: usize = 256; +const NUM_BATCHES: usize = 1; +#[bench] +fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { + let recycler_cache = RecyclerCache::default(); + + let mut packets = Packets::default(); + packets.packets.set_pinnable(); + let slot = 0xdeadc0de; + // need to pin explicitly since the resize will not cause re-allocation + packets.packets.reserve_and_pin(NUM_PACKETS); + packets.packets.resize(NUM_PACKETS, Packet::default()); + for p in packets.packets.iter_mut() { + let shred = Shred::new_from_data( + slot, + 0xc0de, + 0xdead, + Some(&[5; SIZE_OF_DATA_SHRED_PAYLOAD]), + true, + true, + 1, + 2, + ); + shred.copy_to_packet(p); + } + let mut batch = vec![packets; NUM_BATCHES]; + let keypair = Keypair::new(); + let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); + let pinned_keypair = Some(Arc::new(pinned_keypair)); + //warmup + for _ in 0..100 { + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); + } + bencher.iter(|| { + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); + }) +} + +#[bench] +fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { + let mut packets = Packets::default(); + let slot = 0xdeadc0de; + packets.packets.resize(NUM_PACKETS, Packet::default()); + for p in packets.packets.iter_mut() { + let shred = Shred::new_from_data( + slot, + 0xc0de, + 0xdead, + Some(&[5; SIZE_OF_DATA_SHRED_PAYLOAD]), + true, + true, + 1, + 2, + ); + shred.copy_to_packet(p); + } + let mut batch = vec![packets; NUM_BATCHES]; + let keypair = Keypair::new(); + bencher.iter(|| { + sign_shreds_cpu(&keypair, &mut batch); + }) +} diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 5f5c6bd55..b66e6d547 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -11,6 +11,7 @@ use rayon::{ }; use serde::{Deserialize, Serialize}; use solana_metrics::datapoint_debug; +use solana_perf::packet::Packet; use solana_rayon_threadlimit::get_thread_count; use solana_sdk::{ clock::Slot, @@ -137,6 +138,12 @@ impl Shred { Ok(()) } + pub fn copy_to_packet(&self, packet: &mut Packet) { + let len = self.payload.len(); + packet.data[..len].copy_from_slice(&self.payload[..]); + packet.meta.size = len; + } + pub fn new_from_data( slot: Slot, index: u32, diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 94bdcb933..1310caad7 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -1,6 +1,5 @@ #![allow(clippy::implicit_hasher)] use crate::shred::ShredType; -use ed25519_dalek::{Keypair, PublicKey, SecretKey}; use rayon::{ iter::{ IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator, @@ -14,12 +13,17 @@ use solana_perf::{ packet::{limited_deserialize, Packet, Packets}, perf_libs, recycler_cache::RecyclerCache, - sigverify::{self, TxOffset}, + sigverify::{self, batch_size, TxOffset}, }; use solana_rayon_threadlimit::get_thread_count; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use std::sync::Arc; use std::{cell::RefCell, collections::HashMap, mem::size_of}; +pub const SIGN_SHRED_GPU_MIN: usize = 256; + thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .thread_name(|ix| format!("sigverify_shreds_{}", ix)) @@ -61,7 +65,7 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> O fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { use rayon::prelude::*; - let count = sigverify::batch_size(batches); + let count = batch_size(batches); debug!("CPU SHRED ECDSA for {}", count); let rv = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { @@ -69,7 +73,7 @@ fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) .into_par_iter() .map(|p| { p.packets - .iter() + .par_iter() .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) .collect() }) @@ -148,15 +152,24 @@ fn slot_key_data_for_gpu< .push((offset_start + (slot_to_key_ix.get(slot).unwrap() * size_of::())) as u32); }); }); + let num_in_packets = resize_vec(&mut keyvec); + trace!("keyvec.len: {}", keyvec.len()); + trace!("keyvec: {:?}", keyvec); + trace!("offsets: {:?}", offsets); + (keyvec, offsets, num_in_packets) +} + +fn vec_size_in_packets(keyvec: &PinnedVec) -> usize { + (keyvec.len() + (size_of::() - 1)) / size_of::() +} + +fn resize_vec(keyvec: &mut PinnedVec) -> usize { //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems let num_in_packets = (keyvec.len() + (size_of::() - 1)) / size_of::(); keyvec.resize(num_in_packets * size_of::(), 0u8); - trace!("keyvec.len: {}", keyvec.len()); - trace!("keyvec: {:?}", keyvec); - trace!("offsets: {:?}", offsets); - (keyvec, offsets, num_in_packets) + num_in_packets } fn shred_gpu_offsets( @@ -207,7 +220,7 @@ pub fn verify_shreds_gpu( let mut elems = Vec::new(); let mut rvs = Vec::new(); - let count = sigverify::batch_size(batches); + let count = batch_size(batches); let (pubkeys, pubkey_offsets, mut num_packets) = slot_key_data_for_gpu(0, batches, slot_leaders, recycler_cache); //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU @@ -278,37 +291,13 @@ pub fn verify_shreds_gpu( /// ... /// } /// Signature is the first thing in the packet, and slot is the first thing in the signed message. -fn sign_shred_cpu( - packet: &mut Packet, - slot_leaders_pubkeys: &HashMap, - slot_leaders_privkeys: &HashMap, -) { +fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) { let sig_start = 0; let sig_end = sig_start + size_of::(); - let slot_start = sig_end + size_of::(); - let slot_end = slot_start + size_of::(); let msg_start = sig_end; let msg_end = packet.meta.size; - trace!("slot start and end {} {}", slot_start, slot_end); assert!( - packet.meta.size >= slot_end, - "packet is not large enough for a slot" - ); - let slot: u64 = - limited_deserialize(&packet.data[slot_start..slot_end]).expect("can't deserialize slot"); - trace!("slot {}", slot); - let pubkey = slot_leaders_pubkeys - .get(&slot) - .expect("slot pubkey missing"); - let privkey = slot_leaders_privkeys - .get(&slot) - .expect("slot privkey missing"); - let keypair = Keypair { - secret: SecretKey::from_bytes(&privkey[0..32]).expect("dalek privkey parser"), - public: PublicKey::from_bytes(&pubkey[0..32]).expect("dalek pubkey parser"), - }; - assert!( - packet.meta.size >= sig_end, + packet.meta.size >= msg_end, "packet is not large enough for a signature" ); let signature = keypair.sign(&packet.data[msg_start..msg_end]); @@ -316,71 +305,67 @@ fn sign_shred_cpu( packet.data[0..sig_end].copy_from_slice(&signature.to_bytes()); } -fn sign_shreds_cpu( - batches: &mut [Packets], - slot_leaders_pubkeys: &HashMap, - slot_leaders_privkeys: &HashMap, -) { +pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) { use rayon::prelude::*; - let count = sigverify::batch_size(batches); + let count = batch_size(batches); debug!("CPU SHRED ECDSA for {}", count); PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { batches.par_iter_mut().for_each(|p| { - p.packets.iter_mut().for_each(|mut p| { - sign_shred_cpu(&mut p, slot_leaders_pubkeys, slot_leaders_privkeys) - }); + p.packets[..] + .par_iter_mut() + .for_each(|mut p| sign_shred_cpu(keypair, &mut p)); }); }) }); inc_new_counter_debug!("ed25519_shred_verify_cpu", count); } +pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec { + let mut vec = cache.buffer().allocate("pinned_keypair"); + let pubkey = keypair.pubkey().to_bytes(); + let secret = keypair.secret.to_bytes(); + let mut hasher = Sha512::default(); + hasher.input(&secret); + let mut result = hasher.result(); + result[0] &= 248; + result[31] &= 63; + result[31] |= 64; + vec.resize(pubkey.len() + result.len(), 0); + vec[0..pubkey.len()].copy_from_slice(&pubkey); + vec[pubkey.len()..].copy_from_slice(&result); + resize_vec(&mut vec); + vec +} + pub fn sign_shreds_gpu( + keypair: &Keypair, + pinned_keypair: &Option>>, batches: &mut [Packets], - slot_leaders_pubkeys: &HashMap, - slot_leaders_privkeys: &HashMap, recycler_cache: &RecyclerCache, ) { let sig_size = size_of::(); + let pubkey_size = size_of::(); let api = perf_libs::api(); - if api.is_none() { - return sign_shreds_cpu(batches, slot_leaders_pubkeys, slot_leaders_privkeys); + let count = batch_size(batches); + if api.is_none() || count < SIGN_SHRED_GPU_MIN || pinned_keypair.is_none() { + return sign_shreds_cpu(keypair, batches); } - let slot_leaders_secrets: HashMap = slot_leaders_privkeys - .iter() - .map(|(k, v)| { - if *k == std::u64::MAX { - (*k, Signature::default()) - } else { - let mut hasher = Sha512::default(); - hasher.input(&v); - let mut result = hasher.result(); - result[0] &= 248; - result[31] &= 63; - result[31] |= 64; - let sig = Signature::new(result.as_slice()); - (*k, sig) - } - }) - .collect(); let api = api.unwrap(); + let pinned_keypair = pinned_keypair.as_ref().unwrap(); let mut elems = Vec::new(); - let count = sigverify::batch_size(batches); - let mut offset: usize = 0; - let mut num_packets = 0; - let (pubkeys, pubkey_offsets, num_pubkey_packets) = - slot_key_data_for_gpu(offset, batches, slot_leaders_pubkeys, recycler_cache); - offset += num_pubkey_packets * size_of::(); - num_packets += num_pubkey_packets; - trace!("offset: {}", offset); - let (secrets, secret_offsets, num_secret_packets) = - slot_key_data_for_gpu(offset, batches, &slot_leaders_secrets, recycler_cache); - offset += num_secret_packets * size_of::(); - num_packets += num_secret_packets; - //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU - //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data + let offset: usize = pinned_keypair.len(); + let num_keypair_packets = vec_size_in_packets(&pinned_keypair); + let mut num_packets = num_keypair_packets; + + //should be zero + let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets"); + pubkey_offsets.resize(count, 0); + + let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets"); + secret_offsets.resize(count, pubkey_size as u32); + trace!("offset: {}", offset); let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = shred_gpu_offsets(offset, batches, recycler_cache); @@ -391,16 +376,8 @@ pub fn sign_shreds_gpu( elems.push( perf_libs::Elems { #![allow(clippy::cast_ptr_alignment)] - elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, - num: num_pubkey_packets as u32, - }, - ); - - elems.push( - perf_libs::Elems { - #![allow(clippy::cast_ptr_alignment)] - elems: secrets.as_ptr() as *const solana_sdk::packet::Packet, - num: num_secret_packets as u32, + elems: pinned_keypair.as_ptr() as *const solana_sdk::packet::Packet, + num: num_keypair_packets as u32, }, ); @@ -439,6 +416,12 @@ pub fn sign_shreds_gpu( trace!("done sign"); let mut sizes: Vec = vec![0]; sizes.extend(batches.iter().map(|b| b.packets.len())); + for i in 0..sizes.len() { + if i == 0 { + continue; + } + sizes[i] += sizes[i - 1]; + } PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { batches @@ -446,9 +429,8 @@ pub fn sign_shreds_gpu( .enumerate() .for_each(|(batch_ix, batch)| { let num_packets = sizes[batch_ix]; - batch - .packets - .iter_mut() + batch.packets[..] + .par_iter_mut() .enumerate() .for_each(|(packet_ix, packet)| { let sig_ix = packet_ix + num_packets; @@ -466,6 +448,7 @@ pub fn sign_shreds_gpu( #[cfg(test)] pub mod tests { use super::*; + use crate::shred::SIZE_OF_DATA_SHRED_PAYLOAD; use crate::shred::{Shred, Shredder}; use solana_sdk::signature::{Keypair, KeypairUtil}; #[test] @@ -559,8 +542,8 @@ pub mod tests { batch[0].packets[0].meta.size = shred.payload.len(); let leader_slots = [ + (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), ] .iter() .cloned() @@ -570,8 +553,8 @@ pub mod tests { let wrong_keypair = Keypair::new(); let leader_slots = [ + (std::u64::MAX, Pubkey::default().to_bytes()), (slot, wrong_keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), ] .iter() .cloned() @@ -585,8 +568,8 @@ pub mod tests { batch[0].packets[0].meta.size = 0; let leader_slots = [ + (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), ] .iter() .cloned() @@ -600,38 +583,45 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut batch = [Packets::default()]; + let mut packets = Packets::default(); + let num_packets = 32; + let num_batches = 100; let slot = 0xdeadc0de; + packets.packets.resize(num_packets, Packet::default()); + for (i, p) in packets.packets.iter_mut().enumerate() { + let shred = Shred::new_from_data( + slot, + 0xc0de, + i as u16, + Some(&[5; SIZE_OF_DATA_SHRED_PAYLOAD]), + true, + true, + 1, + 2, + ); + shred.copy_to_packet(p); + } + let mut batch = vec![packets; num_batches]; let keypair = Keypair::new(); - let shred = - Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true, 0, 0); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); + let pinned_keypair = sign_shreds_gpu_pinned_keypair(&keypair, &recycler_cache); + let pinned_keypair = Some(Arc::new(pinned_keypair)); let pubkeys = [ + (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let privkeys = [ - (slot, keypair.secret.to_bytes()), - (std::u64::MAX, [0u8; 32]), ] .iter() .cloned() .collect(); //unsigned let rv = verify_shreds_gpu(&batch, &pubkeys, &recycler_cache); - assert_eq!(rv, vec![vec![0]]); + assert_eq!(rv, vec![vec![0; num_packets]; num_batches]); //signed - sign_shreds_gpu(&mut batch, &pubkeys, &privkeys, &recycler_cache); + sign_shreds_gpu(&keypair, &pinned_keypair, &mut batch, &recycler_cache); let rv = verify_shreds_cpu(&batch, &pubkeys); - assert_eq!(rv, vec![vec![1]]); + assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); let rv = verify_shreds_gpu(&batch, &pubkeys, &recycler_cache); - assert_eq!(rv, vec![vec![1]]); + assert_eq!(rv, vec![vec![1; num_packets]; num_batches]); } #[test] @@ -648,14 +638,7 @@ pub mod tests { batch[0].packets[0].meta.size = shred.payload.len(); let pubkeys = [ (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let privkeys = [ - (slot, keypair.secret.to_bytes()), - (std::u64::MAX, [0u8; 32]), + (std::u64::MAX, Pubkey::default().to_bytes()), ] .iter() .cloned() @@ -664,7 +647,7 @@ pub mod tests { let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![0]]); //signed - sign_shreds_cpu(&mut batch, &pubkeys, &privkeys); + sign_shreds_cpu(&keypair, &mut batch); let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![1]]); }