diff --git a/Cargo.lock b/Cargo.lock index f682633dd..dbad13861 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3552,7 +3552,9 @@ dependencies = [ "rocksdb 0.12.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", + "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "solana-budget-api 0.21.0", + "solana-ed25519-dalek 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "solana-genesis-programs 0.21.0", "solana-logger 0.21.0", "solana-measure 0.21.0", @@ -3758,10 +3760,19 @@ dependencies = [ name = "solana-perf" version = "0.21.0" dependencies = [ + "bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-budget-api 0.21.0", + "solana-logger 0.21.0", + "solana-metrics 0.21.0", + "solana-rayon-threadlimit 0.21.0", "solana-sdk 0.21.0", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 2504b3ef6..d61248a0e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -92,9 +92,6 @@ name = "blocktree" [[bench]] name = "gen_keys" -[[bench]] -name = "sigverify" - [[bench]] name = "sigverify_stage" diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 64e3c1591..f682303d0 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -15,10 +15,10 @@ use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::packet::to_packets_chunked; use solana_core::poh_recorder::WorkingBankEntry; use solana_core::service::Service; -use solana_core::test_tx::test_tx; use solana_ledger::blocktree::{get_tmp_ledger_path, Blocktree}; use solana_ledger::blocktree_processor::process_entries; use solana_ledger::entry::{next_hash, Entry}; +use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 116c982a7..51d4c054a 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -9,10 +9,10 @@ use solana_core::contact_info::ContactInfo; use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use solana_core::packet::to_packets_chunked; use solana_core::retransmit_stage::retransmitter; -use solana_core::test_tx::test_tx; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_measure::measure::Measure; +use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index c72b5cc8d..6ef4f738e 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -2,12 +2,12 @@ extern crate test; -use solana_core::test_tx; use solana_ledger::entry::{create_ticks, Entry}; use solana_ledger::shred::{ max_entries_per_n_shred, max_ticks_per_n_shreds, Shred, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_PAYLOAD, }; +use solana_perf::test_tx; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::sync::Arc; diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 147f77ae2..db1c1fe8d 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -10,7 +10,7 @@ use solana_core::packet::to_packets_chunked; use solana_core::service::Service; use solana_core::sigverify::TransactionSigVerifier; use solana_core::sigverify_stage::SigVerifyStage; -use solana_core::test_tx::test_tx; +use solana_perf::test_tx::test_tx; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 9b41bde2f..7526f619d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1812,7 +1812,6 @@ mod tests { use crate::crds_value::CrdsValueLabel; use crate::repair_service::RepairType; use crate::result::Error; - use crate::test_tx::test_tx; use rayon::prelude::*; use solana_ledger::blocktree::get_tmp_ledger_path; use solana_ledger::blocktree::make_many_slot_entries; @@ -1821,6 +1820,7 @@ mod tests { use solana_ledger::shred::{ max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, }; + use solana_perf::test_tx::test_tx; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::HashSet; diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 0e45922f2..e622e019f 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -235,8 +235,8 @@ impl CrdsValue { mod test { use super::*; use crate::contact_info::ContactInfo; - use crate::test_tx::test_tx; use bincode::deserialize; + use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::timestamp; diff --git a/core/src/lib.rs b/core/src/lib.rs index 0e6e6f05a..7b499c7b5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -54,7 +54,6 @@ pub mod sigverify_stage; pub mod snapshot_packager_service; pub mod storage_stage; pub mod streamer; -pub mod test_tx; pub mod tpu; pub mod tvu; pub mod validator; diff --git a/core/src/packet.rs b/core/src/packet.rs index e8320f691..43f723e29 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -3,15 +3,14 @@ use crate::{ recvmmsg::{recv_mmsg, NUM_RCVMMSGS}, result::{Error, Result}, }; -use bincode; -use serde::Serialize; -pub use solana_ledger::packet::{ - Packets, PacketsRecycler, NUM_PACKETS, PACKETS_BATCH_SIZE, PACKETS_PER_BATCH, +pub use solana_perf::packet::{ + limited_deserialize, to_packets, to_packets_chunked, Packets, PacketsRecycler, NUM_PACKETS, + PACKETS_BATCH_SIZE, PACKETS_PER_BATCH, }; use solana_metrics::inc_new_counter_debug; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use std::{io, net::UdpSocket, time::Instant}; +use std::{net::UdpSocket, time::Instant}; pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result { let mut i = 0; @@ -65,41 +64,9 @@ pub fn send_to(obj: &Packets, socket: &UdpSocket) -> Result<()> { Ok(()) } -pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { - let mut out = vec![]; - for x in xs.chunks(chunks) { - let mut p = Packets::default(); - p.packets.resize(x.len(), Packet::default()); - for (i, o) in x.iter().zip(p.packets.iter_mut()) { - let mut wr = io::Cursor::new(&mut o.data[..]); - bincode::serialize_into(&mut wr, &i).expect("serialize request"); - let len = wr.position() as usize; - o.meta.size = len; - } - out.push(p); - } - out -} - -pub fn to_packets(xs: &[T]) -> Vec { - to_packets_chunked(xs, NUM_PACKETS) -} - -pub fn limited_deserialize(data: &[u8]) -> bincode::Result -where - T: serde::de::DeserializeOwned, -{ - bincode::config() - .limit(PACKET_DATA_SIZE as u64) - .deserialize(data) -} - #[cfg(test)] mod tests { use super::*; - use solana_sdk::hash::Hash; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_sdk::system_transaction; use std::io; use std::io::Write; use std::net::{SocketAddr, UdpSocket}; @@ -141,25 +108,6 @@ mod tests { } } - #[test] - fn test_to_packets() { - let keypair = Keypair::new(); - let hash = Hash::new(&[1; 32]); - let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, hash); - let rv = to_packets(&vec![tx.clone(); 1]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].packets.len(), 1); - - let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].packets.len(), NUM_PACKETS); - - let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]); - assert_eq!(rv.len(), 2); - assert_eq!(rv[0].packets.len(), NUM_PACKETS); - assert_eq!(rv[1].packets.len(), 1); - } - #[test] pub fn debug_trait() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 2d3107872..8329c1000 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -469,8 +469,8 @@ impl PohRecorder { mod tests { use super::*; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; - use crate::test_tx::test_tx; use solana_ledger::blocktree::{get_tmp_ledger_path, Blocktree}; + use solana_perf::test_tx::test_tx; use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; use solana_sdk::hash::hash; use std::sync::mpsc::sync_channel; diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 948ff13f5..7169347fe 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -112,9 +112,9 @@ mod tests { use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use crate::poh_recorder::WorkingBank; use crate::result::Result; - use crate::test_tx::test_tx; use solana_ledger::blocktree::{get_tmp_ledger_path, Blocktree}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; + use solana_perf::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; diff --git a/core/src/recvmmsg.rs b/core/src/recvmmsg.rs index b30b06787..9939c3fe9 100644 --- a/core/src/recvmmsg.rs +++ b/core/src/recvmmsg.rs @@ -1,7 +1,7 @@ //! The `recvmmsg` module provides recvmmsg() API implementation use crate::packet::Packet; -pub use solana_ledger::packet::NUM_RCVMMSGS; +pub use solana_perf::packet::NUM_RCVMMSGS; use std::cmp; use std::io; use std::net::UdpSocket; diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 216699031..8f30764b3 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,8 +1,8 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. +use crate::packet::Packet; use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; -use solana_ledger::packet::Packet; use solana_perf::cuda_runtime::PinnedVec; use solana_perf::recycler::Recycler; use std::net::UdpSocket; diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 460f200fe..75446fc47 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -4,21 +4,14 @@ //! to the GPU. //! -use crate::packet::{Packet, Packets}; use crate::sigverify_stage::SigVerifier; -use bincode::serialized_size; -use rayon::ThreadPool; -use solana_metrics::inc_new_counter_debug; use solana_perf::cuda_runtime::PinnedVec; -use solana_perf::perf_libs; +use solana_perf::packet::Packets; use solana_perf::recycler::Recycler; -use solana_sdk::message::MessageHeader; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::short_vec::decode_len; -use solana_sdk::signature::Signature; -#[cfg(test)] -use solana_sdk::transaction::Transaction; -use std::mem::size_of; +use solana_perf::sigverify; +pub use solana_perf::sigverify::{ + batch_size, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset, +}; #[derive(Clone)] pub struct TransactionSigVerifier { @@ -38,7 +31,7 @@ impl Default for TransactionSigVerifier { impl SigVerifier for TransactionSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { - let r = ed25519_verify(&batch, &self.recycler, &self.recycler_out); + let r = sigverify::ed25519_verify(&batch, &self.recycler, &self.recycler_out); mark_disabled(&mut batch, &r); batch } @@ -53,717 +46,19 @@ pub fn mark_disabled(batches: &mut Vec, r: &[Vec]) { }); } -use solana_rayon_threadlimit::get_thread_count; -use std::cell::RefCell; - -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("sigverify_{}", ix)) - .build() - .unwrap())); - -pub type TxOffset = PinnedVec; - -type TxOffsets = (TxOffset, TxOffset, TxOffset, TxOffset, Vec>); - -#[derive(Debug, PartialEq, Eq)] -struct PacketOffsets { - pub sig_len: u32, - pub sig_start: u32, - pub msg_start: u32, - pub pubkey_start: u32, -} - -impl PacketOffsets { - pub fn new(sig_len: u32, sig_start: u32, msg_start: u32, pubkey_start: u32) -> Self { - Self { - sig_len, - sig_start, - msg_start, - pubkey_start, - } - } -} - -#[derive(Debug, PartialEq)] -pub enum PacketError { - InvalidLen, - InvalidPubkeyLen, - InvalidShortVec, - InvalidSignatureLen, - MismatchSignatureLen, - PayerNotWritable, -} - -impl std::convert::From> for PacketError { - fn from(_e: std::boxed::Box) -> PacketError { - PacketError::InvalidShortVec - } -} - -pub fn init() { - if let Some(api) = perf_libs::api() { - unsafe { - (api.ed25519_set_verbose)(true); - if !(api.ed25519_init)() { - panic!("ed25519_init() failed"); - } - (api.ed25519_set_verbose)(false); - } - } -} - -fn verify_packet(packet: &Packet) -> u8 { - let packet_offsets = get_packet_offsets(packet, 0); - let mut sig_start = packet_offsets.sig_start as usize; - let mut pubkey_start = packet_offsets.pubkey_start as usize; - let msg_start = packet_offsets.msg_start as usize; - - if packet_offsets.sig_len == 0 { - return 0; - } - - if packet.meta.size <= msg_start { - return 0; - } - - let msg_end = packet.meta.size; - for _ in 0..packet_offsets.sig_len { - let pubkey_end = pubkey_start as usize + size_of::(); - let sig_end = sig_start as usize + size_of::(); - - if pubkey_end >= packet.meta.size || sig_end >= packet.meta.size { - return 0; - } - - let signature = Signature::new(&packet.data[sig_start..sig_end]); - if !signature.verify( - &packet.data[pubkey_start..pubkey_end], - &packet.data[msg_start..msg_end], - ) { - return 0; - } - pubkey_start += size_of::(); - sig_start += size_of::(); - } - 1 -} - -pub fn batch_size(batches: &[Packets]) -> usize { - batches.iter().map(|p| p.packets.len()).sum() -} - -// internal function to be unit-tested; should be used only by get_packet_offsets -fn do_get_packet_offsets( - packet: &Packet, - current_offset: u32, -) -> Result { - let message_header_size = serialized_size(&MessageHeader::default()).unwrap() as usize; - // should have at least 1 signature, sig lengths and the message header - if (1 + size_of::() + message_header_size) > packet.meta.size { - return Err(PacketError::InvalidLen); - } - - // read the length of Transaction.signatures (serialized with short_vec) - let (sig_len_untrusted, sig_size) = decode_len(&packet.data)?; - - // Using msg_start_offset which is based on sig_len_untrusted introduces uncertainty. - // Ultimately, the actual sigverify will determine the uncertainty. - let msg_start_offset = sig_size + sig_len_untrusted * size_of::(); - - // Packet should have data at least for signatures, MessageHeader, 1 byte for Message.account_keys.len - if (msg_start_offset + message_header_size + 1) > packet.meta.size { - return Err(PacketError::InvalidSignatureLen); - } - - // read MessageHeader.num_required_signatures (serialized with u8) - let sig_len_maybe_trusted = packet.data[msg_start_offset] as usize; - - let message_account_keys_len_offset = msg_start_offset + message_header_size; - - // This reads and compares the MessageHeader num_required_signatures and - // num_readonly_signed_accounts bytes. If num_required_signatures is not larger than - // num_readonly_signed_accounts, the first account is not writable, and cannot be charged - // required transaction fees. - if packet.data[msg_start_offset] <= packet.data[msg_start_offset + 1] { - return Err(PacketError::PayerNotWritable); - } - - // read the length of Message.account_keys (serialized with short_vec) - let (pubkey_len, pubkey_len_size) = - decode_len(&packet.data[message_account_keys_len_offset..])?; - - if (message_account_keys_len_offset + pubkey_len * size_of::() + pubkey_len_size) - > packet.meta.size - { - return Err(PacketError::InvalidPubkeyLen); - } - - let sig_start = current_offset as usize + sig_size; - let msg_start = current_offset as usize + msg_start_offset; - let pubkey_start = msg_start + message_header_size + pubkey_len_size; - - if sig_len_maybe_trusted != sig_len_untrusted { - return Err(PacketError::MismatchSignatureLen); - } - - Ok(PacketOffsets::new( - sig_len_untrusted as u32, - sig_start as u32, - msg_start as u32, - pubkey_start as u32, - )) -} - -fn get_packet_offsets(packet: &Packet, current_offset: u32) -> PacketOffsets { - let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); - if let Ok(offsets) = unsanitized_packet_offsets { - offsets - } else { - // force sigverify to fail by returning zeros - PacketOffsets::new(0, 0, 0, 0) - } -} - -pub fn generate_offsets( - batches: &[Packets], - recycler: &Recycler, -) -> Result { - debug!("allocating.."); - let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); - signature_offsets.set_pinnable(); - let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets"); - pubkey_offsets.set_pinnable(); - let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets"); - msg_start_offsets.set_pinnable(); - let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); - msg_sizes.set_pinnable(); - let mut current_packet = 0; - let mut v_sig_lens = Vec::new(); - batches.iter().for_each(|p| { - let mut sig_lens = Vec::new(); - p.packets.iter().for_each(|packet| { - let current_offset = current_packet as u32 * size_of::() as u32; - - let packet_offsets = get_packet_offsets(packet, current_offset); - - sig_lens.push(packet_offsets.sig_len); - - trace!("pubkey_offset: {}", packet_offsets.pubkey_start); - - let mut pubkey_offset = packet_offsets.pubkey_start; - let mut sig_offset = packet_offsets.sig_start; - for _ in 0..packet_offsets.sig_len { - signature_offsets.push(sig_offset); - sig_offset += size_of::() as u32; - - pubkey_offsets.push(pubkey_offset); - pubkey_offset += size_of::() as u32; - - msg_start_offsets.push(packet_offsets.msg_start); - - msg_sizes - .push(current_offset + (packet.meta.size as u32) - packet_offsets.msg_start); - } - current_packet += 1; - }); - v_sig_lens.push(sig_lens); - }); - Ok(( - signature_offsets, - pubkey_offsets, - msg_start_offsets, - msg_sizes, - v_sig_lens, - )) -} - -pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { - use rayon::prelude::*; - let count = batch_size(batches); - debug!("CPU ECDSA for {}", batch_size(batches)); - let rv = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|p| p.packets.par_iter().map(verify_packet).collect()) - .collect() - }) - }); - inc_new_counter_debug!("ed25519_verify_cpu", count); - rv -} - -pub fn ed25519_verify_disabled(batches: &[Packets]) -> Vec> { - use rayon::prelude::*; - let count = batch_size(batches); - debug!("disabled ECDSA for {}", batch_size(batches)); - let rv = batches - .into_par_iter() - .map(|p| vec![1u8; p.packets.len()]) - .collect(); - inc_new_counter_debug!("ed25519_verify_disabled", count); - rv -} - -pub fn copy_return_values(sig_lens: &[Vec], out: &PinnedVec, rvs: &mut Vec>) { - let mut num = 0; - for (vs, sig_vs) in rvs.iter_mut().zip(sig_lens.iter()) { - for (v, sig_v) in vs.iter_mut().zip(sig_vs.iter()) { - if *sig_v == 0 { - *v = 0; - } else { - let mut vout = 1; - for _ in 0..*sig_v { - if 0 == out[num] { - vout = 0; - } - num += 1; - } - *v = vout; - } - if *v != 0 { - trace!("VERIFIED PACKET!!!!!"); - } - } - } -} - -pub fn ed25519_verify( - batches: &[Packets], - recycler: &Recycler, - recycler_out: &Recycler>, -) -> Vec> { - let api = perf_libs::api(); - if api.is_none() { - return ed25519_verify_cpu(batches); - } - let api = api.unwrap(); - - use crate::packet::PACKET_DATA_SIZE; - let count = batch_size(batches); - - // micro-benchmarks show GPU time for smallest batch around 15-20ms - // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice - // power-of-two number around that accounting for the fact that the CPU - // may be busy doing other things while being a real validator - // TODO: dynamically adjust this crossover - if count < 64 { - return ed25519_verify_cpu(batches); - } - - let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = - generate_offsets(batches, recycler).unwrap(); - - debug!("CUDA ECDSA for {}", batch_size(batches)); - debug!("allocating out.."); - let mut out = recycler_out.allocate("out_buffer"); - out.set_pinnable(); - let mut elems = Vec::new(); - let mut rvs = Vec::new(); - - let mut num_packets = 0; - for p in batches { - elems.push(perf_libs::Elems { - elems: p.packets.as_ptr(), - num: p.packets.len() as u32, - }); - let mut v = Vec::new(); - v.resize(p.packets.len(), 0); - rvs.push(v); - num_packets += p.packets.len(); - } - out.resize(signature_offsets.len(), 0); - trace!("Starting verify num packets: {}", num_packets); - trace!("elem len: {}", elems.len() as u32); - trace!("packet sizeof: {}", size_of::() as u32); - trace!("len offset: {}", PACKET_DATA_SIZE as u32); - const USE_NON_DEFAULT_STREAM: u8 = 1; - unsafe { - let res = (api.ed25519_verify_many)( - elems.as_ptr(), - elems.len() as u32, - size_of::() as u32, - num_packets as u32, - signature_offsets.len() as u32, - msg_sizes.as_ptr(), - pubkey_offsets.as_ptr(), - signature_offsets.as_ptr(), - msg_start_offsets.as_ptr(), - out.as_mut_ptr(), - USE_NON_DEFAULT_STREAM, - ); - if res != 0 { - trace!("RETURN!!!: {}", res); - } - } - trace!("done verify"); - copy_return_values(&sig_lens, &out, &mut rvs); - inc_new_counter_debug!("ed25519_verify_gpu", count); - recycler_out.recycle(out); - recycler.recycle(signature_offsets); - recycler.recycle(pubkey_offsets); - recycler.recycle(msg_sizes); - recycler.recycle(msg_start_offsets); - rvs -} - -#[cfg(test)] -pub fn make_packet_from_transaction(tx: Transaction) -> Packet { - use bincode::serialize; - - let tx_bytes = serialize(&tx).unwrap(); - let mut packet = Packet::default(); - packet.meta.size = tx_bytes.len(); - packet.data[..packet.meta.size].copy_from_slice(&tx_bytes); - return packet; -} - #[cfg(test)] mod tests { use super::*; - use crate::packet::{Packet, Packets}; - use crate::sigverify; - use crate::sigverify::PacketOffsets; - use crate::test_tx::{test_multisig_tx, test_tx}; - use bincode::{deserialize, serialize}; - use solana_sdk::hash::Hash; - use solana_sdk::message::{Message, MessageHeader}; - use solana_sdk::signature::Signature; - use solana_sdk::transaction::Transaction; - - const SIG_OFFSET: usize = 1; - - pub fn memfind(a: &[A], b: &[A]) -> Option { - assert!(a.len() >= b.len()); - let end = a.len() - b.len() + 1; - for i in 0..end { - if a[i..i + b.len()] == b[..] { - return Some(i); - } - } - None - } + use solana_perf::packet::Packet; #[test] - fn test_layout() { - let tx = test_tx(); - let tx_bytes = serialize(&tx).unwrap(); - let packet = serialize(&tx).unwrap(); - assert_matches!(memfind(&packet, &tx_bytes), Some(0)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } - - #[test] - fn test_system_transaction_layout() { - let tx = test_tx(); - let tx_bytes = serialize(&tx).unwrap(); - let message_data = tx.message_data(); - let packet = sigverify::make_packet_from_transaction(tx.clone()); - - let packet_offsets = sigverify::get_packet_offsets(&packet, 0); - - assert_eq!( - memfind(&tx_bytes, &tx.signatures[0].as_ref()), - Some(SIG_OFFSET) - ); - assert_eq!( - memfind(&tx_bytes, &tx.message().account_keys[0].as_ref()), - Some(packet_offsets.pubkey_start as usize) - ); - assert_eq!( - memfind(&tx_bytes, &message_data), - Some(packet_offsets.msg_start as usize) - ); - assert_eq!( - memfind(&tx_bytes, &tx.signatures[0].as_ref()), - Some(packet_offsets.sig_start as usize) - ); - assert_eq!(packet_offsets.sig_len, 1); - } - - fn packet_from_num_sigs(required_num_sigs: u8, actual_num_sigs: usize) -> Packet { - let message = Message { - header: MessageHeader { - num_required_signatures: required_num_sigs, - num_readonly_signed_accounts: 12, - num_readonly_unsigned_accounts: 11, - }, - account_keys: vec![], - recent_blockhash: Hash::default(), - instructions: vec![], - }; - let mut tx = Transaction::new_unsigned(message); - tx.signatures = vec![Signature::default(); actual_num_sigs as usize]; - sigverify::make_packet_from_transaction(tx) - } - - #[test] - fn test_untrustworthy_sigs() { - let required_num_sigs = 14; - let actual_num_sigs = 5; - - let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); - - let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); - - assert_eq!( - unsanitized_packet_offsets, - Err(PacketError::MismatchSignatureLen) - ); - } - - #[test] - fn test_large_sigs() { - // use any large number to be misinterpreted as 2 bytes when decoded as short_vec - let required_num_sigs = 214; - let actual_num_sigs = 5; - - let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); - - let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); - - assert_eq!( - unsanitized_packet_offsets, - Err(PacketError::MismatchSignatureLen) - ); - } - - #[test] - fn test_small_packet() { - let tx = test_tx(); - let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - - packet.data[0] = 0xff; - packet.data[1] = 0xff; - packet.meta.size = 2; - - let res = sigverify::do_get_packet_offsets(&packet, 0); - assert_eq!(res, Err(PacketError::InvalidLen)); - } - - #[test] - fn test_large_sig_len() { - let tx = test_tx(); - let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - - // Make the signatures len huge - packet.data[0] = 0x7f; - - let res = sigverify::do_get_packet_offsets(&packet, 0); - assert_eq!(res, Err(PacketError::InvalidSignatureLen)); - } - - #[test] - fn test_really_large_sig_len() { - let tx = test_tx(); - let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - - // Make the signatures len huge - packet.data[0] = 0xff; - packet.data[1] = 0xff; - packet.data[2] = 0xff; - packet.data[3] = 0xff; - - let res = sigverify::do_get_packet_offsets(&packet, 0); - assert_eq!(res, Err(PacketError::InvalidShortVec)); - } - - #[test] - fn test_invalid_pubkey_len() { - let tx = test_tx(); - let mut packet = sigverify::make_packet_from_transaction(tx.clone()); - - let res = sigverify::do_get_packet_offsets(&packet, 0); - - // make pubkey len huge - packet.data[res.unwrap().pubkey_start as usize - 1] = 0x7f; - - let res = sigverify::do_get_packet_offsets(&packet, 0); - assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); - } - - #[test] - fn test_fee_payer_is_writable() { - let message = Message { - header: MessageHeader { - num_required_signatures: 1, - num_readonly_signed_accounts: 1, - num_readonly_unsigned_accounts: 1, - }, - account_keys: vec![], - recent_blockhash: Hash::default(), - instructions: vec![], - }; - let mut tx = Transaction::new_unsigned(message); - tx.signatures = vec![Signature::default()]; - let packet = sigverify::make_packet_from_transaction(tx.clone()); - let res = sigverify::do_get_packet_offsets(&packet, 0); - - assert_eq!(res, Err(PacketError::PayerNotWritable)); - } - - #[test] - fn test_system_transaction_data_layout() { - use crate::packet::PACKET_DATA_SIZE; - let mut tx0 = test_tx(); - tx0.message.instructions[0].data = vec![1, 2, 3]; - let message0a = tx0.message_data(); - let tx_bytes = serialize(&tx0).unwrap(); - assert!(tx_bytes.len() < PACKET_DATA_SIZE); - assert_eq!( - memfind(&tx_bytes, &tx0.signatures[0].as_ref()), - Some(SIG_OFFSET) - ); - let tx1 = deserialize(&tx_bytes).unwrap(); - assert_eq!(tx0, tx1); - assert_eq!(tx1.message().instructions[0].data, vec![1, 2, 3]); - - tx0.message.instructions[0].data = vec![1, 2, 4]; - let message0b = tx0.message_data(); - assert_ne!(message0a, message0b); - } - - // Just like get_packet_offsets, but not returning redundant information. - fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets { - let packet = sigverify::make_packet_from_transaction(tx); - let packet_offsets = sigverify::get_packet_offsets(&packet, current_offset); - PacketOffsets::new( - packet_offsets.sig_len, - packet_offsets.sig_start - current_offset, - packet_offsets.msg_start - packet_offsets.sig_start, - packet_offsets.pubkey_start - packet_offsets.msg_start, - ) - } - - #[test] - fn test_get_packet_offsets() { - assert_eq!( - get_packet_offsets_from_tx(test_tx(), 0), - PacketOffsets::new(1, 1, 64, 4) - ); - assert_eq!( - get_packet_offsets_from_tx(test_tx(), 100), - PacketOffsets::new(1, 1, 64, 4) - ); - - // Ensure we're not indexing packet by the `current_offset` parameter. - assert_eq!( - get_packet_offsets_from_tx(test_tx(), 1_000_000), - PacketOffsets::new(1, 1, 64, 4) - ); - - // Ensure we're returning sig_len, not sig_size. - assert_eq!( - get_packet_offsets_from_tx(test_multisig_tx(), 0), - PacketOffsets::new(2, 1, 128, 4) - ); - } - - fn generate_packet_vec( - packet: &Packet, - num_packets_per_batch: usize, - num_batches: usize, - ) -> Vec { - // generate packet vector - let batches: Vec<_> = (0..num_batches) - .map(|_| { - let mut packets = Packets::default(); - packets.packets.resize(0, Packet::default()); - for _ in 0..num_packets_per_batch { - packets.packets.push(packet.clone()); - } - assert_eq!(packets.packets.len(), num_packets_per_batch); - packets - }) - .collect(); - assert_eq!(batches.len(), num_batches); - - batches - } - - fn test_verify_n(n: usize, modify_data: bool) { - let tx = test_tx(); - let mut packet = sigverify::make_packet_from_transaction(tx); - - // jumble some data to test failure - if modify_data { - packet.data[20] = packet.data[20].wrapping_add(10); - } - - let batches = generate_packet_vec(&packet, n, 2); - - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); - // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); - - // check result - let ref_ans = if modify_data { 0u8 } else { 1u8 }; - assert_eq!(ans, vec![vec![ref_ans; n], vec![ref_ans; n]]); - } - - #[test] - fn test_verify_tampered_sig_len() { - let mut tx = test_tx().clone(); - // pretend malicious leader dropped a signature... - tx.signatures.pop(); - let packet = sigverify::make_packet_from_transaction(tx); - - let batches = generate_packet_vec(&packet, 1, 1); - - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); - // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); - - assert_eq!(ans, vec![vec![0u8; 1]]); - } - - #[test] - fn test_verify_zero() { - test_verify_n(0, false); - } - - #[test] - fn test_verify_one() { - test_verify_n(1, false); - } - - #[test] - fn test_verify_seventy_one() { - test_verify_n(71, false); - } - - #[test] - fn test_verify_multisig() { - solana_logger::setup(); - - let tx = test_multisig_tx(); - let mut packet = sigverify::make_packet_from_transaction(tx); - - let n = 4; - let num_batches = 3; - let mut batches = generate_packet_vec(&packet, n, num_batches); - - packet.data[40] = packet.data[40].wrapping_add(8); - - batches[0].packets.push(packet); - - let recycler = Recycler::default(); - let recycler_out = Recycler::default(); - // verify packets - let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); - - // check result - let ref_ans = 1u8; - let mut ref_vec = vec![vec![ref_ans; n]; num_batches]; - ref_vec[0].push(0u8); - assert_eq!(ans, ref_vec); - } - - #[test] - fn test_verify_fail() { - test_verify_n(5, true); + fn test_mark_disabled() { + let mut batch = Packets::default(); + batch.packets.push(Packet::default()); + let mut batches: Vec = vec![batch]; + mark_disabled(&mut batches, &[vec![0]]); + assert_eq!(batches[0].packets[0].meta.discard, true); + mark_disabled(&mut batches, &[vec![1]]); + assert_eq!(batches[0].packets[0].meta.discard, false); } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 5bc271b3c..33d5e0d64 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,29 +1,18 @@ #![allow(clippy::implicit_hasher)] -use crate::packet::{limited_deserialize, Packet, Packets}; +use crate::packet::{limited_deserialize, Packets}; use crate::sigverify::{self, TxOffset}; use crate::sigverify_stage::SigVerifier; -use rayon::iter::IndexedParallelIterator; -use rayon::iter::IntoParallelIterator; -use rayon::iter::IntoParallelRefMutIterator; -use rayon::iter::ParallelIterator; -use rayon::ThreadPool; -use sha2::{Digest, Sha512}; -use solana_ed25519_dalek::{Keypair, PublicKey, SecretKey}; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::ShredType; -use solana_metrics::inc_new_counter_debug; +use solana_ledger::sigverify_shreds::verify_shreds_gpu; use solana_perf::cuda_runtime::PinnedVec; -use solana_perf::perf_libs; use solana_perf::recycler::Recycler; -use solana_rayon_threadlimit::get_thread_count; use solana_sdk::signature::Signature; use std::collections::{HashMap, HashSet}; use std::mem::size_of; use std::sync::{Arc, RwLock}; -use std::cell::RefCell; - #[derive(Clone)] pub struct ShredSigVerifier { bank_forks: Arc>, @@ -95,727 +84,14 @@ impl SigVerifier for ShredSigVerifier { } } -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("sigverify_shreds_{}", ix)) - .build() - .unwrap())); - -/// Assuming layout is -/// signature: Signature -/// signed_msg: { -/// type: ShredType -/// slot: u64, -/// ... -/// } -/// Signature is the first thing in the packet, and slot is the first thing in the signed message. -fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { - let sig_start = 0; - let sig_end = size_of::(); - let slot_start = sig_end + size_of::(); - let slot_end = slot_start + size_of::(); - let msg_start = sig_end; - let msg_end = packet.meta.size; - trace!("slot start and end {} {}", slot_start, slot_end); - if packet.meta.size < slot_end { - return Some(0); - } - let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?; - trace!("slot {}", slot); - let pubkey = slot_leaders.get(&slot)?; - if packet.meta.size < sig_end { - return Some(0); - } - let signature = Signature::new(&packet.data[sig_start..sig_end]); - trace!("signature {}", signature); - if !signature.verify(pubkey, &packet.data[msg_start..msg_end]) { - return Some(0); - } - Some(1) -} - -fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { - use rayon::prelude::*; - let count = sigverify::batch_size(batches); - debug!("CPU SHRED ECDSA for {}", count); - let rv = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|p| { - p.packets - .iter() - .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) - .collect() - }) - .collect() - }) - }); - inc_new_counter_debug!("ed25519_shred_verify_cpu", count); - rv -} - -fn slot_key_data_for_gpu< - T: Sync + Sized + Default + std::fmt::Debug + Eq + std::hash::Hash + Clone + Copy, ->( - offset_start: usize, - batches: &[Packets], - slot_keys: &HashMap, - recycler_offsets: &Recycler, - recycler_keys: &Recycler>, -) -> (PinnedVec, TxOffset, usize) { - //TODO: mark Pubkey::default shreds as failed after the GPU returns - assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default())); - let slots: Vec> = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|p| { - p.packets - .iter() - .map(|packet| { - let slot_start = size_of::() + size_of::(); - let slot_end = slot_start + size_of::(); - if packet.meta.size < slot_end { - return std::u64::MAX; - } - let slot: Option = - limited_deserialize(&packet.data[slot_start..slot_end]).ok(); - match slot { - Some(slot) if slot_keys.get(&slot).is_some() => slot, - _ => std::u64::MAX, - } - }) - .collect() - }) - .collect() - }) - }); - let mut keys_to_slots: HashMap> = HashMap::new(); - for batch in slots.iter() { - for slot in batch.iter() { - let key = slot_keys.get(slot).unwrap(); - keys_to_slots - .entry(*key) - .or_insert_with(|| vec![]) - .push(*slot); - } - } - let mut keyvec = recycler_keys.allocate("shred_gpu_pubkeys"); - let mut slot_to_key_ix = HashMap::new(); - for (i, (k, slots)) in keys_to_slots.iter().enumerate() { - keyvec.push(*k); - for s in slots { - slot_to_key_ix.insert(s, i); - } - } - let mut offsets = recycler_offsets.allocate("shred_offsets"); - slots.iter().for_each(|packet_slots| { - packet_slots.iter().for_each(|slot| { - offsets - .push((offset_start + (slot_to_key_ix.get(slot).unwrap() * size_of::())) as u32); - }); - }); - //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU - //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data - //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems - let num_in_packets = - (keyvec.len() * size_of::() + (size_of::() - 1)) / size_of::(); - trace!("num_in_packets {}", num_in_packets); - //number of bytes missing - let missing = num_in_packets * size_of::() - keyvec.len() * size_of::(); - trace!("missing {}", missing); - //extra Pubkeys needed to fill the buffer - let extra = (missing + size_of::() - 1) / size_of::(); - trace!("extra {}", extra); - trace!("keyvec {}", keyvec.len()); - for _ in 0..extra { - keyvec.push(T::default()); - trace!("keyvec {}", keyvec.len()); - } - trace!("keyvec {:?}", keyvec); - trace!("offsets {:?}", offsets); - (keyvec, offsets, num_in_packets) -} - -fn shred_gpu_offsets( - mut pubkeys_end: usize, - batches: &[Packets], - recycler_offsets: &Recycler, -) -> (TxOffset, TxOffset, TxOffset, Vec>) { - let mut signature_offsets = recycler_offsets.allocate("shred_signatures"); - let mut msg_start_offsets = recycler_offsets.allocate("shred_msg_starts"); - let mut msg_sizes = recycler_offsets.allocate("shred_msg_sizes"); - let mut v_sig_lens = vec![]; - for batch in batches { - let mut sig_lens = Vec::new(); - for packet in &batch.packets { - let sig_start = pubkeys_end; - let sig_end = sig_start + size_of::(); - let msg_start = sig_end; - let msg_end = sig_start + packet.meta.size; - signature_offsets.push(sig_start as u32); - msg_start_offsets.push(msg_start as u32); - let msg_size = if msg_end < msg_start { - 0 - } else { - msg_end - msg_start - }; - msg_sizes.push(msg_size as u32); - sig_lens.push(1); - pubkeys_end += size_of::(); - } - v_sig_lens.push(sig_lens); - } - (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) -} - -fn verify_shreds_gpu( - batches: &[Packets], - slot_leaders: &HashMap, - recycler_offsets: &Recycler, - recycler_pubkeys: &Recycler>, - recycler_out: &Recycler>, -) -> Vec> { - let api = perf_libs::api(); - if api.is_none() { - return verify_shreds_cpu(batches, slot_leaders); - } - let api = api.unwrap(); - - let mut elems = Vec::new(); - let mut rvs = Vec::new(); - let count = sigverify::batch_size(batches); - let (pubkeys, pubkey_offsets, mut num_packets) = - slot_key_data_for_gpu(0, batches, slot_leaders, recycler_offsets, recycler_pubkeys); - //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU - //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data - let pubkeys_len = num_packets * size_of::(); - trace!("num_packets: {}", num_packets); - trace!("pubkeys_len: {}", pubkeys_len); - let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = - shred_gpu_offsets(pubkeys_len, batches, recycler_offsets); - let mut out = recycler_out.allocate("out_buffer"); - out.set_pinnable(); - elems.push( - perf_libs::Elems { - #![allow(clippy::cast_ptr_alignment)] - elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, - num: num_packets as u32, - }, - ); - - for p in batches { - elems.push(perf_libs::Elems { - elems: p.packets.as_ptr(), - num: p.packets.len() as u32, - }); - let mut v = Vec::new(); - v.resize(p.packets.len(), 0); - rvs.push(v); - num_packets += p.packets.len(); - } - out.resize(signature_offsets.len(), 0); - - trace!("Starting verify num packets: {}", num_packets); - trace!("elem len: {}", elems.len() as u32); - trace!("packet sizeof: {}", size_of::() as u32); - const USE_NON_DEFAULT_STREAM: u8 = 1; - unsafe { - let res = (api.ed25519_verify_many)( - elems.as_ptr(), - elems.len() as u32, - size_of::() as u32, - num_packets as u32, - signature_offsets.len() as u32, - msg_sizes.as_ptr(), - pubkey_offsets.as_ptr(), - signature_offsets.as_ptr(), - msg_start_offsets.as_ptr(), - out.as_mut_ptr(), - USE_NON_DEFAULT_STREAM, - ); - if res != 0 { - trace!("RETURN!!!: {}", res); - } - } - trace!("done verify"); - trace!("out buf {:?}", out); - - sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs); - - inc_new_counter_debug!("ed25519_shred_verify_gpu", count); - recycler_out.recycle(out); - recycler_offsets.recycle(signature_offsets); - recycler_offsets.recycle(pubkey_offsets); - recycler_offsets.recycle(msg_sizes); - recycler_offsets.recycle(msg_start_offsets); - recycler_pubkeys.recycle(pubkeys); - rvs -} - -/// Assuming layout is -/// signature: Signature -/// signed_msg: { -/// type: ShredType -/// slot: u64, -/// ... -/// } -/// Signature is the first thing in the packet, and slot is the first thing in the signed message. -fn sign_shred_cpu( - packet: &mut Packet, - slot_leaders_pubkeys: &HashMap, - slot_leaders_privkeys: &HashMap, -) { - let sig_start = 0; - let sig_end = sig_start + size_of::(); - let slot_start = sig_end + size_of::(); - let slot_end = slot_start + size_of::(); - let msg_start = sig_end; - let msg_end = packet.meta.size; - trace!("slot start and end {} {}", slot_start, slot_end); - assert!( - packet.meta.size >= slot_end, - "packet is not large enough for a slot" - ); - let slot: u64 = - limited_deserialize(&packet.data[slot_start..slot_end]).expect("can't deserialize slot"); - trace!("slot {}", slot); - let pubkey = slot_leaders_pubkeys - .get(&slot) - .expect("slot pubkey missing"); - let privkey = slot_leaders_privkeys - .get(&slot) - .expect("slot privkey missing"); - let keypair = Keypair { - secret: SecretKey::from_bytes(&privkey[0..32]).expect("dalek privkey parser"), - public: PublicKey::from_bytes(&pubkey[0..32]).expect("dalek pubkey parser"), - }; - assert!( - packet.meta.size >= sig_end, - "packet is not large enough for a signature" - ); - let signature = keypair.sign(&packet.data[msg_start..msg_end]); - trace!("signature {:?}", signature); - packet.data[0..sig_end].copy_from_slice(&signature.to_bytes()); -} - -fn sign_shreds_cpu( - batches: &mut [Packets], - slot_leaders_pubkeys: &HashMap, - slot_leaders_privkeys: &HashMap, -) { - use rayon::prelude::*; - let count = sigverify::batch_size(batches); - debug!("CPU SHRED ECDSA for {}", count); - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches.par_iter_mut().for_each(|p| { - p.packets.iter_mut().for_each(|mut p| { - sign_shred_cpu(&mut p, slot_leaders_pubkeys, slot_leaders_privkeys) - }); - }); - }) - }); - inc_new_counter_debug!("ed25519_shred_verify_cpu", count); -} - -pub fn sign_shreds_gpu( - batches: &mut [Packets], - slot_leaders_pubkeys: &HashMap, - slot_leaders_privkeys: &HashMap, - recycler_offsets: &Recycler, - recycler_pubkeys: &Recycler>, - recycler_secrets: &Recycler>, - recycler_out: &Recycler>, -) { - let sig_size = size_of::(); - let api = perf_libs::api(); - if api.is_none() { - return sign_shreds_cpu(batches, slot_leaders_pubkeys, slot_leaders_privkeys); - } - let slot_leaders_secrets: HashMap = slot_leaders_privkeys - .iter() - .map(|(k, v)| { - if *k == std::u64::MAX { - (*k, Signature::default()) - } else { - let mut hasher = Sha512::default(); - hasher.input(&v); - let mut result = hasher.result(); - result[0] &= 248; - result[31] &= 63; - result[31] |= 64; - let sig = Signature::new(result.as_slice()); - (*k, sig) - } - }) - .collect(); - let api = api.unwrap(); - - let mut elems = Vec::new(); - let count = sigverify::batch_size(batches); - let mut offset: usize = 0; - let mut num_packets = 0; - let (pubkeys, pubkey_offsets, num_pubkey_packets) = slot_key_data_for_gpu( - offset, - batches, - slot_leaders_pubkeys, - recycler_offsets, - recycler_pubkeys, - ); - offset += num_pubkey_packets * size_of::(); - num_packets += num_pubkey_packets; - trace!("offset: {}", offset); - let (secrets, secret_offsets, num_secret_packets) = slot_key_data_for_gpu( - offset, - batches, - &slot_leaders_secrets, - recycler_offsets, - recycler_secrets, - ); - offset += num_secret_packets * size_of::(); - num_packets += num_secret_packets; - //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU - //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data - trace!("offset: {}", offset); - let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = - shred_gpu_offsets(offset, batches, recycler_offsets); - let total_sigs = signature_offsets.len(); - let mut signatures_out = recycler_out.allocate("ed25519 signatures"); - signatures_out.resize(total_sigs * sig_size, 0); - elems.push( - perf_libs::Elems { - #![allow(clippy::cast_ptr_alignment)] - elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, - num: num_pubkey_packets as u32, - }, - ); - - elems.push( - perf_libs::Elems { - #![allow(clippy::cast_ptr_alignment)] - elems: secrets.as_ptr() as *const solana_sdk::packet::Packet, - num: num_secret_packets as u32, - }, - ); - - for p in batches.iter() { - elems.push(perf_libs::Elems { - elems: p.packets.as_ptr(), - num: p.packets.len() as u32, - }); - let mut v = Vec::new(); - v.resize(p.packets.len(), 0); - num_packets += p.packets.len(); - } - - trace!("Starting verify num packets: {}", num_packets); - trace!("elem len: {}", elems.len() as u32); - trace!("packet sizeof: {}", size_of::() as u32); - const USE_NON_DEFAULT_STREAM: u8 = 1; - unsafe { - let res = (api.ed25519_sign_many)( - elems.as_mut_ptr(), - elems.len() as u32, - size_of::() as u32, - num_packets as u32, - total_sigs as u32, - msg_sizes.as_ptr(), - pubkey_offsets.as_ptr(), - secret_offsets.as_ptr(), - msg_start_offsets.as_ptr(), - signatures_out.as_mut_ptr(), - USE_NON_DEFAULT_STREAM, - ); - if res != 0 { - trace!("RETURN!!!: {}", res); - } - } - trace!("done sign"); - let mut sizes: Vec = vec![0]; - sizes.extend(batches.iter().map(|b| b.packets.len())); - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .par_iter_mut() - .enumerate() - .for_each(|(batch_ix, batch)| { - let num_packets = sizes[batch_ix]; - batch - .packets - .iter_mut() - .enumerate() - .for_each(|(packet_ix, packet)| { - let sig_ix = packet_ix + num_packets; - let sig_start = sig_ix * sig_size; - let sig_end = sig_start + sig_size; - packet.data[0..sig_size] - .copy_from_slice(&signatures_out[sig_start..sig_end]); - }); - }); - }); - }); - inc_new_counter_debug!("ed25519_shred_sign_gpu", count); - recycler_out.recycle(signatures_out); - recycler_offsets.recycle(signature_offsets); - recycler_offsets.recycle(pubkey_offsets); - recycler_offsets.recycle(msg_sizes); - recycler_offsets.recycle(msg_start_offsets); - recycler_pubkeys.recycle(pubkeys); -} - #[cfg(test)] pub mod tests { use super::*; use crate::genesis_utils::create_genesis_block_with_leader; + use crate::packet::Packet; use solana_ledger::shred::{Shred, Shredder}; use solana_runtime::bank::Bank; use solana_sdk::signature::{Keypair, KeypairUtil}; - #[test] - fn test_sigverify_shred_cpu() { - solana_logger::setup(); - let mut packet = Packet::default(); - let slot = 0xdeadc0de; - let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); - assert_eq!(shred.slot(), slot); - let keypair = Keypair::new(); - Shredder::sign_shred(&keypair, &mut shred); - trace!("signature {}", shred.common_header.signature); - packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload); - packet.meta.size = shred.payload.len(); - - let leader_slots = [(slot, keypair.pubkey().to_bytes())] - .iter() - .cloned() - .collect(); - let rv = verify_shred_cpu(&packet, &leader_slots); - assert_eq!(rv, Some(1)); - - let wrong_keypair = Keypair::new(); - let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())] - .iter() - .cloned() - .collect(); - let rv = verify_shred_cpu(&packet, &leader_slots); - assert_eq!(rv, Some(0)); - - let leader_slots = HashMap::new(); - let rv = verify_shred_cpu(&packet, &leader_slots); - assert_eq!(rv, None); - } - - #[test] - fn test_sigverify_shreds_cpu() { - solana_logger::setup(); - let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; - let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); - let keypair = Keypair::new(); - Shredder::sign_shred(&keypair, &mut shred); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); - - let leader_slots = [(slot, keypair.pubkey().to_bytes())] - .iter() - .cloned() - .collect(); - let rv = verify_shreds_cpu(&batch, &leader_slots); - assert_eq!(rv, vec![vec![1]]); - - let wrong_keypair = Keypair::new(); - let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())] - .iter() - .cloned() - .collect(); - let rv = verify_shreds_cpu(&batch, &leader_slots); - assert_eq!(rv, vec![vec![0]]); - - let leader_slots = HashMap::new(); - let rv = verify_shreds_cpu(&batch, &leader_slots); - assert_eq!(rv, vec![vec![0]]); - - let leader_slots = [(slot, keypair.pubkey().to_bytes())] - .iter() - .cloned() - .collect(); - batch[0].packets[0].meta.size = 0; - let rv = verify_shreds_cpu(&batch, &leader_slots); - assert_eq!(rv, vec![vec![0]]); - } - - #[test] - fn test_sigverify_shreds_gpu() { - solana_logger::setup(); - let recycler_offsets = Recycler::default(); - let recycler_pubkeys = Recycler::default(); - let recycler_out = Recycler::default(); - - let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; - let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); - let keypair = Keypair::new(); - Shredder::sign_shred(&keypair, &mut shred); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); - - let leader_slots = [ - (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let rv = verify_shreds_gpu( - &batch, - &leader_slots, - &recycler_offsets, - &recycler_pubkeys, - &recycler_out, - ); - assert_eq!(rv, vec![vec![1]]); - - let wrong_keypair = Keypair::new(); - let leader_slots = [ - (slot, wrong_keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let rv = verify_shreds_gpu( - &batch, - &leader_slots, - &recycler_offsets, - &recycler_pubkeys, - &recycler_out, - ); - assert_eq!(rv, vec![vec![0]]); - - let leader_slots = [(std::u64::MAX, [0u8; 32])].iter().cloned().collect(); - let rv = verify_shreds_gpu( - &batch, - &leader_slots, - &recycler_offsets, - &recycler_pubkeys, - &recycler_out, - ); - assert_eq!(rv, vec![vec![0]]); - - batch[0].packets[0].meta.size = 0; - let leader_slots = [ - (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let rv = verify_shreds_gpu( - &batch, - &leader_slots, - &recycler_offsets, - &recycler_pubkeys, - &recycler_out, - ); - assert_eq!(rv, vec![vec![0]]); - } - - #[test] - fn test_sigverify_shreds_sign_gpu() { - solana_logger::setup(); - let recycler_offsets = Recycler::default(); - let recycler_pubkeys = Recycler::default(); - let recycler_secrets = Recycler::default(); - let recycler_out = Recycler::default(); - - let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; - let keypair = Keypair::new(); - let shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); - let pubkeys = [ - (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let privkeys = [ - (slot, keypair.secret.to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - //unsigned - let rv = verify_shreds_gpu( - &batch, - &pubkeys, - &recycler_offsets, - &recycler_pubkeys, - &recycler_out, - ); - assert_eq!(rv, vec![vec![0]]); - //signed - sign_shreds_gpu( - &mut batch, - &pubkeys, - &privkeys, - &recycler_offsets, - &recycler_pubkeys, - &recycler_secrets, - &recycler_out, - ); - let rv = verify_shreds_cpu(&batch, &pubkeys); - assert_eq!(rv, vec![vec![1]]); - - let rv = verify_shreds_gpu( - &batch, - &pubkeys, - &recycler_offsets, - &recycler_pubkeys, - &recycler_out, - ); - assert_eq!(rv, vec![vec![1]]); - } - - #[test] - fn test_sigverify_shreds_sign_cpu() { - solana_logger::setup(); - - let mut batch = [Packets::default()]; - let slot = 0xdeadc0de; - let keypair = Keypair::new(); - let shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); - batch[0].packets.resize(1, Packet::default()); - batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); - batch[0].packets[0].meta.size = shred.payload.len(); - let pubkeys = [ - (slot, keypair.pubkey().to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - let privkeys = [ - (slot, keypair.secret.to_bytes()), - (std::u64::MAX, [0u8; 32]), - ] - .iter() - .cloned() - .collect(); - //unsigned - let rv = verify_shreds_cpu(&batch, &pubkeys); - assert_eq!(rv, vec![vec![0]]); - //signed - sign_shreds_cpu(&mut batch, &pubkeys, &privkeys); - let rv = verify_shreds_cpu(&batch, &pubkeys); - assert_eq!(rv, vec![vec![1]]); - } #[test] fn test_sigverify_shreds_read_slots() { diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 9de090514..9828d218f 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -16,6 +16,7 @@ chrono = { version = "0.4.9", features = ["serde"] } dir-diff = "0.3.2" dlopen = "0.1.8" dlopen_derive = "0.1.4" +sha2 = "0.8.0" fs_extra = "1.1.0" itertools = "0.8.1" lazy_static = "1.4.0" @@ -33,6 +34,7 @@ solana-measure = { path = "../measure", version = "0.21.0" } solana-merkle-tree = { path = "../merkle-tree", version = "0.21.0" } solana-metrics = { path = "../metrics", version = "0.21.0" } solana-perf = { path = "../perf", version = "0.21.0" } +solana-ed25519-dalek = "0.2.0" solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } solana-runtime = { path = "../runtime", version = "0.21.0" } solana-sdk = { path = "../sdk", version = "0.21.0" } diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 6fe695f84..85af3d9b5 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -12,13 +12,16 @@ pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; pub mod leader_schedule_utils; -pub mod packet; pub mod poh; pub mod rooted_slot_iterator; pub mod shred; +pub mod sigverify_shreds; pub mod snapshot_package; pub mod snapshot_utils; pub mod staking_utils; #[macro_use] extern crate solana_metrics; + +#[macro_use] +extern crate log; diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs new file mode 100644 index 000000000..b7d949788 --- /dev/null +++ b/ledger/src/sigverify_shreds.rs @@ -0,0 +1,742 @@ +#![allow(clippy::implicit_hasher)] +use crate::shred::ShredType; +use rayon::iter::IndexedParallelIterator; +use rayon::iter::IntoParallelIterator; +use rayon::iter::IntoParallelRefMutIterator; +use rayon::iter::ParallelIterator; +use rayon::ThreadPool; +use sha2::{Digest, Sha512}; +use solana_ed25519_dalek::{Keypair, PublicKey, SecretKey}; +use solana_metrics::inc_new_counter_debug; +use solana_perf::cuda_runtime::PinnedVec; +use solana_perf::packet::{limited_deserialize, Packet, Packets}; +use solana_perf::perf_libs; +use solana_perf::recycler::Recycler; +use solana_perf::sigverify::{self, TxOffset}; +use solana_rayon_threadlimit::get_thread_count; +use solana_sdk::signature::Signature; +use std::collections::HashMap; +use std::mem::size_of; + +use std::cell::RefCell; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("sigverify_shreds_{}", ix)) + .build() + .unwrap())); + +/// Assuming layout is +/// signature: Signature +/// signed_msg: { +/// type: ShredType +/// slot: u64, +/// ... +/// } +/// Signature is the first thing in the packet, and slot is the first thing in the signed message. +fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { + let sig_start = 0; + let sig_end = size_of::(); + let slot_start = sig_end + size_of::(); + let slot_end = slot_start + size_of::(); + let msg_start = sig_end; + let msg_end = packet.meta.size; + trace!("slot start and end {} {}", slot_start, slot_end); + if packet.meta.size < slot_end { + return Some(0); + } + let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?; + trace!("slot {}", slot); + let pubkey = slot_leaders.get(&slot)?; + if packet.meta.size < sig_end { + return Some(0); + } + let signature = Signature::new(&packet.data[sig_start..sig_end]); + trace!("signature {}", signature); + if !signature.verify(pubkey, &packet.data[msg_start..msg_end]) { + return Some(0); + } + Some(1) +} + +fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap) -> Vec> { + use rayon::prelude::*; + let count = sigverify::batch_size(batches); + debug!("CPU SHRED ECDSA for {}", count); + let rv = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .map(|p| { + p.packets + .iter() + .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) + .collect() + }) + .collect() + }) + }); + inc_new_counter_debug!("ed25519_shred_verify_cpu", count); + rv +} + +fn slot_key_data_for_gpu< + T: Sync + Sized + Default + std::fmt::Debug + Eq + std::hash::Hash + Clone + Copy, +>( + offset_start: usize, + batches: &[Packets], + slot_keys: &HashMap, + recycler_offsets: &Recycler, + recycler_keys: &Recycler>, +) -> (PinnedVec, TxOffset, usize) { + //TODO: mark Pubkey::default shreds as failed after the GPU returns + assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default())); + let slots: Vec> = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .map(|p| { + p.packets + .iter() + .map(|packet| { + let slot_start = size_of::() + size_of::(); + let slot_end = slot_start + size_of::(); + if packet.meta.size < slot_end { + return std::u64::MAX; + } + let slot: Option = + limited_deserialize(&packet.data[slot_start..slot_end]).ok(); + match slot { + Some(slot) if slot_keys.get(&slot).is_some() => slot, + _ => std::u64::MAX, + } + }) + .collect() + }) + .collect() + }) + }); + let mut keys_to_slots: HashMap> = HashMap::new(); + for batch in slots.iter() { + for slot in batch.iter() { + let key = slot_keys.get(slot).unwrap(); + keys_to_slots + .entry(*key) + .or_insert_with(|| vec![]) + .push(*slot); + } + } + let mut keyvec = recycler_keys.allocate("shred_gpu_pubkeys"); + let mut slot_to_key_ix = HashMap::new(); + for (i, (k, slots)) in keys_to_slots.iter().enumerate() { + keyvec.push(*k); + for s in slots { + slot_to_key_ix.insert(s, i); + } + } + let mut offsets = recycler_offsets.allocate("shred_offsets"); + slots.iter().for_each(|packet_slots| { + packet_slots.iter().for_each(|slot| { + offsets + .push((offset_start + (slot_to_key_ix.get(slot).unwrap() * size_of::())) as u32); + }); + }); + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data + //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems + let num_in_packets = + (keyvec.len() * size_of::() + (size_of::() - 1)) / size_of::(); + trace!("num_in_packets {}", num_in_packets); + //number of bytes missing + let missing = num_in_packets * size_of::() - keyvec.len() * size_of::(); + trace!("missing {}", missing); + //extra Pubkeys needed to fill the buffer + let extra = (missing + size_of::() - 1) / size_of::(); + trace!("extra {}", extra); + trace!("keyvec {}", keyvec.len()); + for _ in 0..extra { + keyvec.push(T::default()); + trace!("keyvec {}", keyvec.len()); + } + trace!("keyvec {:?}", keyvec); + trace!("offsets {:?}", offsets); + (keyvec, offsets, num_in_packets) +} + +fn shred_gpu_offsets( + mut pubkeys_end: usize, + batches: &[Packets], + recycler_offsets: &Recycler, +) -> (TxOffset, TxOffset, TxOffset, Vec>) { + let mut signature_offsets = recycler_offsets.allocate("shred_signatures"); + let mut msg_start_offsets = recycler_offsets.allocate("shred_msg_starts"); + let mut msg_sizes = recycler_offsets.allocate("shred_msg_sizes"); + let mut v_sig_lens = vec![]; + for batch in batches { + let mut sig_lens = Vec::new(); + for packet in &batch.packets { + let sig_start = pubkeys_end; + let sig_end = sig_start + size_of::(); + let msg_start = sig_end; + let msg_end = sig_start + packet.meta.size; + signature_offsets.push(sig_start as u32); + msg_start_offsets.push(msg_start as u32); + let msg_size = if msg_end < msg_start { + 0 + } else { + msg_end - msg_start + }; + msg_sizes.push(msg_size as u32); + sig_lens.push(1); + pubkeys_end += size_of::(); + } + v_sig_lens.push(sig_lens); + } + (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) +} + +pub fn verify_shreds_gpu( + batches: &[Packets], + slot_leaders: &HashMap, + recycler_offsets: &Recycler, + recycler_pubkeys: &Recycler>, + recycler_out: &Recycler>, +) -> Vec> { + let api = perf_libs::api(); + if api.is_none() { + return verify_shreds_cpu(batches, slot_leaders); + } + let api = api.unwrap(); + + let mut elems = Vec::new(); + let mut rvs = Vec::new(); + let count = sigverify::batch_size(batches); + let (pubkeys, pubkey_offsets, mut num_packets) = + slot_key_data_for_gpu(0, batches, slot_leaders, recycler_offsets, recycler_pubkeys); + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data + let pubkeys_len = num_packets * size_of::(); + trace!("num_packets: {}", num_packets); + trace!("pubkeys_len: {}", pubkeys_len); + let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) = + shred_gpu_offsets(pubkeys_len, batches, recycler_offsets); + let mut out = recycler_out.allocate("out_buffer"); + out.set_pinnable(); + elems.push( + perf_libs::Elems { + #![allow(clippy::cast_ptr_alignment)] + elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, + num: num_packets as u32, + }, + ); + + for p in batches { + elems.push(perf_libs::Elems { + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, + }); + let mut v = Vec::new(); + v.resize(p.packets.len(), 0); + rvs.push(v); + num_packets += p.packets.len(); + } + out.resize(signature_offsets.len(), 0); + + trace!("Starting verify num packets: {}", num_packets); + trace!("elem len: {}", elems.len() as u32); + trace!("packet sizeof: {}", size_of::() as u32); + const USE_NON_DEFAULT_STREAM: u8 = 1; + unsafe { + let res = (api.ed25519_verify_many)( + elems.as_ptr(), + elems.len() as u32, + size_of::() as u32, + num_packets as u32, + signature_offsets.len() as u32, + msg_sizes.as_ptr(), + pubkey_offsets.as_ptr(), + signature_offsets.as_ptr(), + msg_start_offsets.as_ptr(), + out.as_mut_ptr(), + USE_NON_DEFAULT_STREAM, + ); + if res != 0 { + trace!("RETURN!!!: {}", res); + } + } + trace!("done verify"); + trace!("out buf {:?}", out); + + sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs); + + inc_new_counter_debug!("ed25519_shred_verify_gpu", count); + recycler_out.recycle(out); + recycler_offsets.recycle(signature_offsets); + recycler_offsets.recycle(pubkey_offsets); + recycler_offsets.recycle(msg_sizes); + recycler_offsets.recycle(msg_start_offsets); + recycler_pubkeys.recycle(pubkeys); + rvs +} + +/// Assuming layout is +/// signature: Signature +/// signed_msg: { +/// type: ShredType +/// slot: u64, +/// ... +/// } +/// Signature is the first thing in the packet, and slot is the first thing in the signed message. +fn sign_shred_cpu( + packet: &mut Packet, + slot_leaders_pubkeys: &HashMap, + slot_leaders_privkeys: &HashMap, +) { + let sig_start = 0; + let sig_end = sig_start + size_of::(); + let slot_start = sig_end + size_of::(); + let slot_end = slot_start + size_of::(); + let msg_start = sig_end; + let msg_end = packet.meta.size; + trace!("slot start and end {} {}", slot_start, slot_end); + assert!( + packet.meta.size >= slot_end, + "packet is not large enough for a slot" + ); + let slot: u64 = + limited_deserialize(&packet.data[slot_start..slot_end]).expect("can't deserialize slot"); + trace!("slot {}", slot); + let pubkey = slot_leaders_pubkeys + .get(&slot) + .expect("slot pubkey missing"); + let privkey = slot_leaders_privkeys + .get(&slot) + .expect("slot privkey missing"); + let keypair = Keypair { + secret: SecretKey::from_bytes(&privkey[0..32]).expect("dalek privkey parser"), + public: PublicKey::from_bytes(&pubkey[0..32]).expect("dalek pubkey parser"), + }; + assert!( + packet.meta.size >= sig_end, + "packet is not large enough for a signature" + ); + let signature = keypair.sign(&packet.data[msg_start..msg_end]); + trace!("signature {:?}", signature); + packet.data[0..sig_end].copy_from_slice(&signature.to_bytes()); +} + +fn sign_shreds_cpu( + batches: &mut [Packets], + slot_leaders_pubkeys: &HashMap, + slot_leaders_privkeys: &HashMap, +) { + use rayon::prelude::*; + let count = sigverify::batch_size(batches); + debug!("CPU SHRED ECDSA for {}", count); + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches.par_iter_mut().for_each(|p| { + p.packets.iter_mut().for_each(|mut p| { + sign_shred_cpu(&mut p, slot_leaders_pubkeys, slot_leaders_privkeys) + }); + }); + }) + }); + inc_new_counter_debug!("ed25519_shred_verify_cpu", count); +} + +pub fn sign_shreds_gpu( + batches: &mut [Packets], + slot_leaders_pubkeys: &HashMap, + slot_leaders_privkeys: &HashMap, + recycler_offsets: &Recycler, + recycler_pubkeys: &Recycler>, + recycler_secrets: &Recycler>, + recycler_out: &Recycler>, +) { + let sig_size = size_of::(); + let api = perf_libs::api(); + if api.is_none() { + return sign_shreds_cpu(batches, slot_leaders_pubkeys, slot_leaders_privkeys); + } + let slot_leaders_secrets: HashMap = slot_leaders_privkeys + .iter() + .map(|(k, v)| { + if *k == std::u64::MAX { + (*k, Signature::default()) + } else { + let mut hasher = Sha512::default(); + hasher.input(&v); + let mut result = hasher.result(); + result[0] &= 248; + result[31] &= 63; + result[31] |= 64; + let sig = Signature::new(result.as_slice()); + (*k, sig) + } + }) + .collect(); + let api = api.unwrap(); + + let mut elems = Vec::new(); + let count = sigverify::batch_size(batches); + let mut offset: usize = 0; + let mut num_packets = 0; + let (pubkeys, pubkey_offsets, num_pubkey_packets) = slot_key_data_for_gpu( + offset, + batches, + slot_leaders_pubkeys, + recycler_offsets, + recycler_pubkeys, + ); + offset += num_pubkey_packets * size_of::(); + num_packets += num_pubkey_packets; + trace!("offset: {}", offset); + let (secrets, secret_offsets, num_secret_packets) = slot_key_data_for_gpu( + offset, + batches, + &slot_leaders_secrets, + recycler_offsets, + recycler_secrets, + ); + offset += num_secret_packets * size_of::(); + num_packets += num_secret_packets; + //HACK: Pubkeys vector is passed along as a `Packets` buffer to the GPU + //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data + trace!("offset: {}", offset); + let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) = + shred_gpu_offsets(offset, batches, recycler_offsets); + let total_sigs = signature_offsets.len(); + let mut signatures_out = recycler_out.allocate("ed25519 signatures"); + signatures_out.resize(total_sigs * sig_size, 0); + elems.push( + perf_libs::Elems { + #![allow(clippy::cast_ptr_alignment)] + elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, + num: num_pubkey_packets as u32, + }, + ); + + elems.push( + perf_libs::Elems { + #![allow(clippy::cast_ptr_alignment)] + elems: secrets.as_ptr() as *const solana_sdk::packet::Packet, + num: num_secret_packets as u32, + }, + ); + + for p in batches.iter() { + elems.push(perf_libs::Elems { + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, + }); + let mut v = Vec::new(); + v.resize(p.packets.len(), 0); + num_packets += p.packets.len(); + } + + trace!("Starting verify num packets: {}", num_packets); + trace!("elem len: {}", elems.len() as u32); + trace!("packet sizeof: {}", size_of::() as u32); + const USE_NON_DEFAULT_STREAM: u8 = 1; + unsafe { + let res = (api.ed25519_sign_many)( + elems.as_mut_ptr(), + elems.len() as u32, + size_of::() as u32, + num_packets as u32, + total_sigs as u32, + msg_sizes.as_ptr(), + pubkey_offsets.as_ptr(), + secret_offsets.as_ptr(), + msg_start_offsets.as_ptr(), + signatures_out.as_mut_ptr(), + USE_NON_DEFAULT_STREAM, + ); + if res != 0 { + trace!("RETURN!!!: {}", res); + } + } + trace!("done sign"); + let mut sizes: Vec = vec![0]; + sizes.extend(batches.iter().map(|b| b.packets.len())); + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .par_iter_mut() + .enumerate() + .for_each(|(batch_ix, batch)| { + let num_packets = sizes[batch_ix]; + batch + .packets + .iter_mut() + .enumerate() + .for_each(|(packet_ix, packet)| { + let sig_ix = packet_ix + num_packets; + let sig_start = sig_ix * sig_size; + let sig_end = sig_start + sig_size; + packet.data[0..sig_size] + .copy_from_slice(&signatures_out[sig_start..sig_end]); + }); + }); + }); + }); + inc_new_counter_debug!("ed25519_shred_sign_gpu", count); + recycler_out.recycle(signatures_out); + recycler_offsets.recycle(signature_offsets); + recycler_offsets.recycle(pubkey_offsets); + recycler_offsets.recycle(msg_sizes); + recycler_offsets.recycle(msg_start_offsets); + recycler_pubkeys.recycle(pubkeys); +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::shred::{Shred, Shredder}; + use solana_sdk::signature::{Keypair, KeypairUtil}; + #[test] + fn test_sigverify_shred_cpu() { + solana_logger::setup(); + let mut packet = Packet::default(); + let slot = 0xdeadc0de; + let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + assert_eq!(shred.slot(), slot); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + trace!("signature {}", shred.common_header.signature); + packet.data[0..shred.payload.len()].copy_from_slice(&shred.payload); + packet.meta.size = shred.payload.len(); + + let leader_slots = [(slot, keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(1)); + + let wrong_keypair = Keypair::new(); + let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(0)); + + let leader_slots = HashMap::new(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, None); + } + + #[test] + fn test_sigverify_shreds_cpu() { + solana_logger::setup(); + let mut batch = [Packets::default()]; + let slot = 0xdeadc0de; + let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + + let leader_slots = [(slot, keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![1]]); + + let wrong_keypair = Keypair::new(); + let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![0]]); + + let leader_slots = HashMap::new(); + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![0]]); + + let leader_slots = [(slot, keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + batch[0].packets[0].meta.size = 0; + let rv = verify_shreds_cpu(&batch, &leader_slots); + assert_eq!(rv, vec![vec![0]]); + } + + #[test] + fn test_sigverify_shreds_gpu() { + solana_logger::setup(); + let recycler_offsets = Recycler::default(); + let recycler_pubkeys = Recycler::default(); + let recycler_out = Recycler::default(); + + let mut batch = [Packets::default()]; + let slot = 0xdeadc0de; + let mut shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + + let leader_slots = [ + (slot, keypair.pubkey().to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![1]]); + + let wrong_keypair = Keypair::new(); + let leader_slots = [ + (slot, wrong_keypair.pubkey().to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + + let leader_slots = [(std::u64::MAX, [0u8; 32])].iter().cloned().collect(); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + + batch[0].packets[0].meta.size = 0; + let leader_slots = [ + (slot, keypair.pubkey().to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + let rv = verify_shreds_gpu( + &batch, + &leader_slots, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + } + + #[test] + fn test_sigverify_shreds_sign_gpu() { + solana_logger::setup(); + let recycler_offsets = Recycler::default(); + let recycler_pubkeys = Recycler::default(); + let recycler_secrets = Recycler::default(); + let recycler_out = Recycler::default(); + + let mut batch = [Packets::default()]; + let slot = 0xdeadc0de; + let keypair = Keypair::new(); + let shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + let pubkeys = [ + (slot, keypair.pubkey().to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + let privkeys = [ + (slot, keypair.secret.to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + //unsigned + let rv = verify_shreds_gpu( + &batch, + &pubkeys, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![0]]); + //signed + sign_shreds_gpu( + &mut batch, + &pubkeys, + &privkeys, + &recycler_offsets, + &recycler_pubkeys, + &recycler_secrets, + &recycler_out, + ); + let rv = verify_shreds_cpu(&batch, &pubkeys); + assert_eq!(rv, vec![vec![1]]); + + let rv = verify_shreds_gpu( + &batch, + &pubkeys, + &recycler_offsets, + &recycler_pubkeys, + &recycler_out, + ); + assert_eq!(rv, vec![vec![1]]); + } + + #[test] + fn test_sigverify_shreds_sign_cpu() { + solana_logger::setup(); + + let mut batch = [Packets::default()]; + let slot = 0xdeadc0de; + let keypair = Keypair::new(); + let shred = Shred::new_from_data(slot, 0xc0de, 0xdead, Some(&[1, 2, 3, 4]), true, true); + batch[0].packets.resize(1, Packet::default()); + batch[0].packets[0].data[0..shred.payload.len()].copy_from_slice(&shred.payload); + batch[0].packets[0].meta.size = shred.payload.len(); + let pubkeys = [ + (slot, keypair.pubkey().to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + let privkeys = [ + (slot, keypair.secret.to_bytes()), + (std::u64::MAX, [0u8; 32]), + ] + .iter() + .cloned() + .collect(); + //unsigned + let rv = verify_shreds_cpu(&batch, &pubkeys); + assert_eq!(rv, vec![vec![0]]); + //signed + sign_shreds_cpu(&mut batch, &pubkeys, &privkeys); + let rv = verify_shreds_cpu(&batch, &pubkeys); + assert_eq!(rv, vec![vec![1]]); + } +} diff --git a/perf/Cargo.toml b/perf/Cargo.toml index b390e3fab..2b8318f34 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -11,10 +11,23 @@ edition = "2018" [dependencies] rand = "0.6.5" dlopen = "0.1.8" +bincode = "1.2.0" +rayon = "1.2.0" +serde = "1.0.102" +serde_derive = "1.0.102" dlopen_derive = "0.1.4" log = "0.4.8" solana-sdk = { path = "../sdk", version = "0.21.0" } +solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } +solana-budget-api = { path = "../programs/budget_api", version = "0.21.0" } +solana-logger = { path = "../logger", version = "0.21.0" } +solana-metrics = { path = "../metrics", version = "0.21.0" } [lib] name = "solana_perf" +[dev-dependencies] +matches = "0.1.6" + +[[bench]] +name = "sigverify" diff --git a/core/benches/sigverify.rs b/perf/benches/sigverify.rs similarity index 90% rename from core/benches/sigverify.rs rename to perf/benches/sigverify.rs index 0777a99ca..49d057044 100644 --- a/core/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -2,10 +2,10 @@ extern crate test; -use solana_core::packet::to_packets; -use solana_core::sigverify; -use solana_core::test_tx::test_tx; +use solana_perf::packet::to_packets; use solana_perf::recycler::Recycler; +use solana_perf::sigverify; +use solana_perf::test_tx::test_tx; use test::Bencher; #[bench] diff --git a/perf/src/lib.rs b/perf/src/lib.rs index 8a1feb59a..04f2246d6 100644 --- a/perf/src/lib.rs +++ b/perf/src/lib.rs @@ -1,6 +1,16 @@ pub mod cuda_runtime; +pub mod packet; pub mod perf_libs; pub mod recycler; +pub mod sigverify; +pub mod test_tx; #[macro_use] extern crate log; + +#[cfg(test)] +#[macro_use] +extern crate matches; + +#[macro_use] +extern crate solana_metrics; diff --git a/ledger/src/packet.rs b/perf/src/packet.rs similarity index 55% rename from ledger/src/packet.rs rename to perf/src/packet.rs index 1f124950e..321e2de11 100644 --- a/ledger/src/packet.rs +++ b/perf/src/packet.rs @@ -1,10 +1,11 @@ //! The `packet` module defines data structures and methods to pull data from the network. -use solana_perf::{ +use crate::{ cuda_runtime::PinnedVec, recycler::{Recycler, Reset}, }; +use serde::Serialize; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use std::{mem, net::SocketAddr}; +use std::{io, mem, net::SocketAddr}; pub const NUM_PACKETS: usize = 1024 * 8; @@ -72,9 +73,41 @@ impl Packets { } } +pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { + let mut out = vec![]; + for x in xs.chunks(chunks) { + let mut p = Packets::default(); + p.packets.resize(x.len(), Packet::default()); + for (i, o) in x.iter().zip(p.packets.iter_mut()) { + let mut wr = io::Cursor::new(&mut o.data[..]); + bincode::serialize_into(&mut wr, &i).expect("serialize request"); + let len = wr.position() as usize; + o.meta.size = len; + } + out.push(p); + } + out +} + +pub fn to_packets(xs: &[T]) -> Vec { + to_packets_chunked(xs, NUM_PACKETS) +} + +pub fn limited_deserialize(data: &[u8]) -> bincode::Result +where + T: serde::de::DeserializeOwned, +{ + bincode::config() + .limit(PACKET_DATA_SIZE as u64) + .deserialize(data) +} + #[cfg(test)] mod tests { use super::*; + use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction; #[test] fn test_packets_reset() { @@ -84,4 +117,23 @@ mod tests { packets.reset(); assert_eq!(packets.packets.len(), 0); } + + #[test] + fn test_to_packets() { + let keypair = Keypair::new(); + let hash = Hash::new(&[1; 32]); + let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, hash); + let rv = to_packets(&vec![tx.clone(); 1]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].packets.len(), 1); + + let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].packets.len(), NUM_PACKETS); + + let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]); + assert_eq!(rv.len(), 2); + assert_eq!(rv[0].packets.len(), NUM_PACKETS); + assert_eq!(rv[1].packets.len(), 1); + } } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs new file mode 100644 index 000000000..a44c27ce1 --- /dev/null +++ b/perf/src/sigverify.rs @@ -0,0 +1,734 @@ +//! The `sigverify` module provides digital signature verification functions. +//! By default, signatures are verified in parallel using all available CPU +//! cores. When perf-libs are available signature verification is offloaded +//! to the GPU. +//! + +use crate::cuda_runtime::PinnedVec; +use crate::packet::{Packet, Packets}; +use crate::perf_libs; +use crate::recycler::Recycler; +use bincode::serialized_size; +use rayon::ThreadPool; +use solana_metrics::inc_new_counter_debug; +use solana_rayon_threadlimit::get_thread_count; +use solana_sdk::message::MessageHeader; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::short_vec::decode_len; +use solana_sdk::signature::Signature; +#[cfg(test)] +use solana_sdk::transaction::Transaction; +use std::cell::RefCell; +use std::mem::size_of; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("sigverify_{}", ix)) + .build() + .unwrap())); + +pub type TxOffset = PinnedVec; + +type TxOffsets = (TxOffset, TxOffset, TxOffset, TxOffset, Vec>); + +#[derive(Debug, PartialEq, Eq)] +struct PacketOffsets { + pub sig_len: u32, + pub sig_start: u32, + pub msg_start: u32, + pub pubkey_start: u32, +} + +impl PacketOffsets { + pub fn new(sig_len: u32, sig_start: u32, msg_start: u32, pubkey_start: u32) -> Self { + Self { + sig_len, + sig_start, + msg_start, + pubkey_start, + } + } +} + +#[derive(Debug, PartialEq)] +pub enum PacketError { + InvalidLen, + InvalidPubkeyLen, + InvalidShortVec, + InvalidSignatureLen, + MismatchSignatureLen, + PayerNotWritable, +} + +impl std::convert::From> for PacketError { + fn from(_e: std::boxed::Box) -> PacketError { + PacketError::InvalidShortVec + } +} + +pub fn init() { + if let Some(api) = perf_libs::api() { + unsafe { + (api.ed25519_set_verbose)(true); + if !(api.ed25519_init)() { + panic!("ed25519_init() failed"); + } + (api.ed25519_set_verbose)(false); + } + } +} + +fn verify_packet(packet: &Packet) -> u8 { + let packet_offsets = get_packet_offsets(packet, 0); + let mut sig_start = packet_offsets.sig_start as usize; + let mut pubkey_start = packet_offsets.pubkey_start as usize; + let msg_start = packet_offsets.msg_start as usize; + + if packet_offsets.sig_len == 0 { + return 0; + } + + if packet.meta.size <= msg_start { + return 0; + } + + let msg_end = packet.meta.size; + for _ in 0..packet_offsets.sig_len { + let pubkey_end = pubkey_start as usize + size_of::(); + let sig_end = sig_start as usize + size_of::(); + + if pubkey_end >= packet.meta.size || sig_end >= packet.meta.size { + return 0; + } + + let signature = Signature::new(&packet.data[sig_start..sig_end]); + if !signature.verify( + &packet.data[pubkey_start..pubkey_end], + &packet.data[msg_start..msg_end], + ) { + return 0; + } + pubkey_start += size_of::(); + sig_start += size_of::(); + } + 1 +} + +pub fn batch_size(batches: &[Packets]) -> usize { + batches.iter().map(|p| p.packets.len()).sum() +} + +// internal function to be unit-tested; should be used only by get_packet_offsets +fn do_get_packet_offsets( + packet: &Packet, + current_offset: u32, +) -> Result { + let message_header_size = serialized_size(&MessageHeader::default()).unwrap() as usize; + // should have at least 1 signature, sig lengths and the message header + if (1 + size_of::() + message_header_size) > packet.meta.size { + return Err(PacketError::InvalidLen); + } + + // read the length of Transaction.signatures (serialized with short_vec) + let (sig_len_untrusted, sig_size) = decode_len(&packet.data)?; + + // Using msg_start_offset which is based on sig_len_untrusted introduces uncertainty. + // Ultimately, the actual sigverify will determine the uncertainty. + let msg_start_offset = sig_size + sig_len_untrusted * size_of::(); + + // Packet should have data at least for signatures, MessageHeader, 1 byte for Message.account_keys.len + if (msg_start_offset + message_header_size + 1) > packet.meta.size { + return Err(PacketError::InvalidSignatureLen); + } + + // read MessageHeader.num_required_signatures (serialized with u8) + let sig_len_maybe_trusted = packet.data[msg_start_offset] as usize; + + let message_account_keys_len_offset = msg_start_offset + message_header_size; + + // This reads and compares the MessageHeader num_required_signatures and + // num_readonly_signed_accounts bytes. If num_required_signatures is not larger than + // num_readonly_signed_accounts, the first account is not debitable, and cannot be charged + // required transaction fees. + if packet.data[msg_start_offset] <= packet.data[msg_start_offset + 1] { + return Err(PacketError::PayerNotWritable); + } + + // read the length of Message.account_keys (serialized with short_vec) + let (pubkey_len, pubkey_len_size) = + decode_len(&packet.data[message_account_keys_len_offset..])?; + + if (message_account_keys_len_offset + pubkey_len * size_of::() + pubkey_len_size) + > packet.meta.size + { + return Err(PacketError::InvalidPubkeyLen); + } + + let sig_start = current_offset as usize + sig_size; + let msg_start = current_offset as usize + msg_start_offset; + let pubkey_start = msg_start + message_header_size + pubkey_len_size; + + if sig_len_maybe_trusted != sig_len_untrusted { + return Err(PacketError::MismatchSignatureLen); + } + + Ok(PacketOffsets::new( + sig_len_untrusted as u32, + sig_start as u32, + msg_start as u32, + pubkey_start as u32, + )) +} + +fn get_packet_offsets(packet: &Packet, current_offset: u32) -> PacketOffsets { + let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); + if let Ok(offsets) = unsanitized_packet_offsets { + offsets + } else { + // force sigverify to fail by returning zeros + PacketOffsets::new(0, 0, 0, 0) + } +} + +pub fn generate_offsets( + batches: &[Packets], + recycler: &Recycler, +) -> Result { + debug!("allocating.."); + let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); + signature_offsets.set_pinnable(); + let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets"); + pubkey_offsets.set_pinnable(); + let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets"); + msg_start_offsets.set_pinnable(); + let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); + msg_sizes.set_pinnable(); + let mut current_packet = 0; + let mut v_sig_lens = Vec::new(); + batches.iter().for_each(|p| { + let mut sig_lens = Vec::new(); + p.packets.iter().for_each(|packet| { + let current_offset = current_packet as u32 * size_of::() as u32; + + let packet_offsets = get_packet_offsets(packet, current_offset); + + sig_lens.push(packet_offsets.sig_len); + + trace!("pubkey_offset: {}", packet_offsets.pubkey_start); + + let mut pubkey_offset = packet_offsets.pubkey_start; + let mut sig_offset = packet_offsets.sig_start; + for _ in 0..packet_offsets.sig_len { + signature_offsets.push(sig_offset); + sig_offset += size_of::() as u32; + + pubkey_offsets.push(pubkey_offset); + pubkey_offset += size_of::() as u32; + + msg_start_offsets.push(packet_offsets.msg_start); + + msg_sizes + .push(current_offset + (packet.meta.size as u32) - packet_offsets.msg_start); + } + current_packet += 1; + }); + v_sig_lens.push(sig_lens); + }); + Ok(( + signature_offsets, + pubkey_offsets, + msg_start_offsets, + msg_sizes, + v_sig_lens, + )) +} + +pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec> { + use rayon::prelude::*; + let count = batch_size(batches); + debug!("CPU ECDSA for {}", batch_size(batches)); + let rv = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + batches + .into_par_iter() + .map(|p| p.packets.par_iter().map(verify_packet).collect()) + .collect() + }) + }); + inc_new_counter_debug!("ed25519_verify_cpu", count); + rv +} + +pub fn ed25519_verify_disabled(batches: &[Packets]) -> Vec> { + use rayon::prelude::*; + let count = batch_size(batches); + debug!("disabled ECDSA for {}", batch_size(batches)); + let rv = batches + .into_par_iter() + .map(|p| vec![1u8; p.packets.len()]) + .collect(); + inc_new_counter_debug!("ed25519_verify_disabled", count); + rv +} + +pub fn copy_return_values(sig_lens: &[Vec], out: &PinnedVec, rvs: &mut Vec>) { + let mut num = 0; + for (vs, sig_vs) in rvs.iter_mut().zip(sig_lens.iter()) { + for (v, sig_v) in vs.iter_mut().zip(sig_vs.iter()) { + if *sig_v == 0 { + *v = 0; + } else { + let mut vout = 1; + for _ in 0..*sig_v { + if 0 == out[num] { + vout = 0; + } + num += 1; + } + *v = vout; + } + if *v != 0 { + trace!("VERIFIED PACKET!!!!!"); + } + } + } +} + +pub fn ed25519_verify( + batches: &[Packets], + recycler: &Recycler, + recycler_out: &Recycler>, +) -> Vec> { + let api = perf_libs::api(); + if api.is_none() { + return ed25519_verify_cpu(batches); + } + let api = api.unwrap(); + + use crate::packet::PACKET_DATA_SIZE; + let count = batch_size(batches); + + // micro-benchmarks show GPU time for smallest batch around 15-20ms + // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice + // power-of-two number around that accounting for the fact that the CPU + // may be busy doing other things while being a real validator + // TODO: dynamically adjust this crossover + if count < 64 { + return ed25519_verify_cpu(batches); + } + + let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = + generate_offsets(batches, recycler).unwrap(); + + debug!("CUDA ECDSA for {}", batch_size(batches)); + debug!("allocating out.."); + let mut out = recycler_out.allocate("out_buffer"); + out.set_pinnable(); + let mut elems = Vec::new(); + let mut rvs = Vec::new(); + + let mut num_packets = 0; + for p in batches { + elems.push(perf_libs::Elems { + elems: p.packets.as_ptr(), + num: p.packets.len() as u32, + }); + let mut v = Vec::new(); + v.resize(p.packets.len(), 0); + rvs.push(v); + num_packets += p.packets.len(); + } + out.resize(signature_offsets.len(), 0); + trace!("Starting verify num packets: {}", num_packets); + trace!("elem len: {}", elems.len() as u32); + trace!("packet sizeof: {}", size_of::() as u32); + trace!("len offset: {}", PACKET_DATA_SIZE as u32); + const USE_NON_DEFAULT_STREAM: u8 = 1; + unsafe { + let res = (api.ed25519_verify_many)( + elems.as_ptr(), + elems.len() as u32, + size_of::() as u32, + num_packets as u32, + signature_offsets.len() as u32, + msg_sizes.as_ptr(), + pubkey_offsets.as_ptr(), + signature_offsets.as_ptr(), + msg_start_offsets.as_ptr(), + out.as_mut_ptr(), + USE_NON_DEFAULT_STREAM, + ); + if res != 0 { + trace!("RETURN!!!: {}", res); + } + } + trace!("done verify"); + copy_return_values(&sig_lens, &out, &mut rvs); + inc_new_counter_debug!("ed25519_verify_gpu", count); + recycler_out.recycle(out); + recycler.recycle(signature_offsets); + recycler.recycle(pubkey_offsets); + recycler.recycle(msg_sizes); + recycler.recycle(msg_start_offsets); + rvs +} + +#[cfg(test)] +pub fn make_packet_from_transaction(tx: Transaction) -> Packet { + use bincode::serialize; + + let tx_bytes = serialize(&tx).unwrap(); + let mut packet = Packet::default(); + packet.meta.size = tx_bytes.len(); + packet.data[..packet.meta.size].copy_from_slice(&tx_bytes); + return packet; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::packet::{Packet, Packets}; + use crate::sigverify; + use crate::sigverify::PacketOffsets; + use crate::test_tx::{test_multisig_tx, test_tx}; + use bincode::{deserialize, serialize}; + use solana_sdk::hash::Hash; + use solana_sdk::message::{Message, MessageHeader}; + use solana_sdk::signature::Signature; + use solana_sdk::transaction::Transaction; + + const SIG_OFFSET: usize = 1; + + pub fn memfind(a: &[A], b: &[A]) -> Option { + assert!(a.len() >= b.len()); + let end = a.len() - b.len() + 1; + for i in 0..end { + if a[i..i + b.len()] == b[..] { + return Some(i); + } + } + None + } + + #[test] + fn test_layout() { + let tx = test_tx(); + let tx_bytes = serialize(&tx).unwrap(); + let packet = serialize(&tx).unwrap(); + assert_matches!(memfind(&packet, &tx_bytes), Some(0)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } + + #[test] + fn test_system_transaction_layout() { + let tx = test_tx(); + let tx_bytes = serialize(&tx).unwrap(); + let message_data = tx.message_data(); + let packet = sigverify::make_packet_from_transaction(tx.clone()); + + let packet_offsets = sigverify::get_packet_offsets(&packet, 0); + + assert_eq!( + memfind(&tx_bytes, &tx.signatures[0].as_ref()), + Some(SIG_OFFSET) + ); + assert_eq!( + memfind(&tx_bytes, &tx.message().account_keys[0].as_ref()), + Some(packet_offsets.pubkey_start as usize) + ); + assert_eq!( + memfind(&tx_bytes, &message_data), + Some(packet_offsets.msg_start as usize) + ); + assert_eq!( + memfind(&tx_bytes, &tx.signatures[0].as_ref()), + Some(packet_offsets.sig_start as usize) + ); + assert_eq!(packet_offsets.sig_len, 1); + } + + fn packet_from_num_sigs(required_num_sigs: u8, actual_num_sigs: usize) -> Packet { + let message = Message { + header: MessageHeader { + num_required_signatures: required_num_sigs, + num_readonly_signed_accounts: 12, + num_readonly_unsigned_accounts: 11, + }, + account_keys: vec![], + recent_blockhash: Hash::default(), + instructions: vec![], + }; + let mut tx = Transaction::new_unsigned(message); + tx.signatures = vec![Signature::default(); actual_num_sigs as usize]; + sigverify::make_packet_from_transaction(tx) + } + + #[test] + fn test_untrustworthy_sigs() { + let required_num_sigs = 14; + let actual_num_sigs = 5; + + let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); + + let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); + + assert_eq!( + unsanitized_packet_offsets, + Err(PacketError::MismatchSignatureLen) + ); + } + + #[test] + fn test_large_sigs() { + // use any large number to be misinterpreted as 2 bytes when decoded as short_vec + let required_num_sigs = 214; + let actual_num_sigs = 5; + + let packet = packet_from_num_sigs(required_num_sigs, actual_num_sigs); + + let unsanitized_packet_offsets = sigverify::do_get_packet_offsets(&packet, 0); + + assert_eq!( + unsanitized_packet_offsets, + Err(PacketError::MismatchSignatureLen) + ); + } + + #[test] + fn test_small_packet() { + let tx = test_tx(); + let mut packet = sigverify::make_packet_from_transaction(tx.clone()); + + packet.data[0] = 0xff; + packet.data[1] = 0xff; + packet.meta.size = 2; + + let res = sigverify::do_get_packet_offsets(&packet, 0); + assert_eq!(res, Err(PacketError::InvalidLen)); + } + + #[test] + fn test_large_sig_len() { + let tx = test_tx(); + let mut packet = sigverify::make_packet_from_transaction(tx.clone()); + + // Make the signatures len huge + packet.data[0] = 0x7f; + + let res = sigverify::do_get_packet_offsets(&packet, 0); + assert_eq!(res, Err(PacketError::InvalidSignatureLen)); + } + + #[test] + fn test_really_large_sig_len() { + let tx = test_tx(); + let mut packet = sigverify::make_packet_from_transaction(tx.clone()); + + // Make the signatures len huge + packet.data[0] = 0xff; + packet.data[1] = 0xff; + packet.data[2] = 0xff; + packet.data[3] = 0xff; + + let res = sigverify::do_get_packet_offsets(&packet, 0); + assert_eq!(res, Err(PacketError::InvalidShortVec)); + } + + #[test] + fn test_invalid_pubkey_len() { + let tx = test_tx(); + let mut packet = sigverify::make_packet_from_transaction(tx.clone()); + + let res = sigverify::do_get_packet_offsets(&packet, 0); + + // make pubkey len huge + packet.data[res.unwrap().pubkey_start as usize - 1] = 0x7f; + + let res = sigverify::do_get_packet_offsets(&packet, 0); + assert_eq!(res, Err(PacketError::InvalidPubkeyLen)); + } + + #[test] + fn test_fee_payer_is_debitable() { + let message = Message { + header: MessageHeader { + num_required_signatures: 1, + num_readonly_signed_accounts: 1, + num_readonly_unsigned_accounts: 1, + }, + account_keys: vec![], + recent_blockhash: Hash::default(), + instructions: vec![], + }; + let mut tx = Transaction::new_unsigned(message); + tx.signatures = vec![Signature::default()]; + let packet = sigverify::make_packet_from_transaction(tx.clone()); + let res = sigverify::do_get_packet_offsets(&packet, 0); + + assert_eq!(res, Err(PacketError::PayerNotWritable)); + } + + #[test] + fn test_system_transaction_data_layout() { + use crate::packet::PACKET_DATA_SIZE; + let mut tx0 = test_tx(); + tx0.message.instructions[0].data = vec![1, 2, 3]; + let message0a = tx0.message_data(); + let tx_bytes = serialize(&tx0).unwrap(); + assert!(tx_bytes.len() < PACKET_DATA_SIZE); + assert_eq!( + memfind(&tx_bytes, &tx0.signatures[0].as_ref()), + Some(SIG_OFFSET) + ); + let tx1 = deserialize(&tx_bytes).unwrap(); + assert_eq!(tx0, tx1); + assert_eq!(tx1.message().instructions[0].data, vec![1, 2, 3]); + + tx0.message.instructions[0].data = vec![1, 2, 4]; + let message0b = tx0.message_data(); + assert_ne!(message0a, message0b); + } + + // Just like get_packet_offsets, but not returning redundant information. + fn get_packet_offsets_from_tx(tx: Transaction, current_offset: u32) -> PacketOffsets { + let packet = sigverify::make_packet_from_transaction(tx); + let packet_offsets = sigverify::get_packet_offsets(&packet, current_offset); + PacketOffsets::new( + packet_offsets.sig_len, + packet_offsets.sig_start - current_offset, + packet_offsets.msg_start - packet_offsets.sig_start, + packet_offsets.pubkey_start - packet_offsets.msg_start, + ) + } + + #[test] + fn test_get_packet_offsets() { + assert_eq!( + get_packet_offsets_from_tx(test_tx(), 0), + PacketOffsets::new(1, 1, 64, 4) + ); + assert_eq!( + get_packet_offsets_from_tx(test_tx(), 100), + PacketOffsets::new(1, 1, 64, 4) + ); + + // Ensure we're not indexing packet by the `current_offset` parameter. + assert_eq!( + get_packet_offsets_from_tx(test_tx(), 1_000_000), + PacketOffsets::new(1, 1, 64, 4) + ); + + // Ensure we're returning sig_len, not sig_size. + assert_eq!( + get_packet_offsets_from_tx(test_multisig_tx(), 0), + PacketOffsets::new(2, 1, 128, 4) + ); + } + + fn generate_packet_vec( + packet: &Packet, + num_packets_per_batch: usize, + num_batches: usize, + ) -> Vec { + // generate packet vector + let batches: Vec<_> = (0..num_batches) + .map(|_| { + let mut packets = Packets::default(); + packets.packets.resize(0, Packet::default()); + for _ in 0..num_packets_per_batch { + packets.packets.push(packet.clone()); + } + assert_eq!(packets.packets.len(), num_packets_per_batch); + packets + }) + .collect(); + assert_eq!(batches.len(), num_batches); + + batches + } + + fn test_verify_n(n: usize, modify_data: bool) { + let tx = test_tx(); + let mut packet = sigverify::make_packet_from_transaction(tx); + + // jumble some data to test failure + if modify_data { + packet.data[20] = packet.data[20].wrapping_add(10); + } + + let batches = generate_packet_vec(&packet, n, 2); + + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + // verify packets + let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + + // check result + let ref_ans = if modify_data { 0u8 } else { 1u8 }; + assert_eq!(ans, vec![vec![ref_ans; n], vec![ref_ans; n]]); + } + + #[test] + fn test_verify_tampered_sig_len() { + let mut tx = test_tx().clone(); + // pretend malicious leader dropped a signature... + tx.signatures.pop(); + let packet = sigverify::make_packet_from_transaction(tx); + + let batches = generate_packet_vec(&packet, 1, 1); + + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + // verify packets + let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + + assert_eq!(ans, vec![vec![0u8; 1]]); + } + + #[test] + fn test_verify_zero() { + test_verify_n(0, false); + } + + #[test] + fn test_verify_one() { + test_verify_n(1, false); + } + + #[test] + fn test_verify_seventy_one() { + test_verify_n(71, false); + } + + #[test] + fn test_verify_multisig() { + solana_logger::setup(); + + let tx = test_multisig_tx(); + let mut packet = sigverify::make_packet_from_transaction(tx); + + let n = 4; + let num_batches = 3; + let mut batches = generate_packet_vec(&packet, n, num_batches); + + packet.data[40] = packet.data[40].wrapping_add(8); + + batches[0].packets.push(packet); + + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + // verify packets + let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); + + // check result + let ref_ans = 1u8; + let mut ref_vec = vec![vec![ref_ans; n]; num_batches]; + ref_vec[0].push(0u8); + assert_eq!(ans, ref_vec); + } + + #[test] + fn test_verify_fail() { + test_verify_n(5, true); + } +} diff --git a/core/src/test_tx.rs b/perf/src/test_tx.rs similarity index 100% rename from core/src/test_tx.rs rename to perf/src/test_tx.rs