From a425cab518b755b1e979ffd5d854ce14513a711f Mon Sep 17 00:00:00 2001 From: Illia Bobyr Date: Mon, 13 Feb 2023 11:52:01 -0800 Subject: [PATCH] entry: Split `start_verify_transactions` into `*_{cpu,gpu}` (#29962) `start_verify_transactions` is relatively long and contains both a small part of the CPU verification and the full GPU verification code. As CPU and GPU verification are equal peers, it seems easier to follow the code when they each live in separate functions. No functional changes. Only moving code around. --- entry/src/entry.rs | 231 ++++++++++++++++++++++++--------------------- 1 file changed, 124 insertions(+), 107 deletions(-) diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 4e55ea16ec..9427e9b008 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -432,34 +432,50 @@ pub fn start_verify_transactions( .is_some(); if use_cpu { - let verify_func = { - let verification_mode = if skip_verification { - TransactionVerificationMode::HashOnly - } else { - TransactionVerificationMode::FullVerification - }; - move |versioned_tx: VersionedTransaction| -> Result { - verify(versioned_tx, verification_mode) - } + start_verify_transactions_cpu(entries, skip_verification, verify) + } else { + start_verify_transactions_gpu(entries, verify_recyclers, verify) + } +} + +fn start_verify_transactions_cpu( + entries: Vec, + skip_verification: bool, + verify: Arc< + dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + + Send + + Sync, + >, +) -> Result { + let verify_func = { + let mode = if skip_verification { + TransactionVerificationMode::HashOnly + } else { + TransactionVerificationMode::FullVerification }; - let entries = verify_transactions(entries, Arc::new(verify_func)); + move |versioned_tx| verify(versioned_tx, mode) + }; - match entries { - Ok(entries_val) => { - return Ok(EntrySigVerificationState { - verification_status: EntryVerificationStatus::Success, - entries: Some(entries_val), - device_verification_data: DeviceSigVerificationData::Cpu(), - gpu_verify_duration_us: 0, - }); - } - Err(err) => { - return Err(err); - } - } - } + let entries = verify_transactions(entries, Arc::new(verify_func))?; + Ok(EntrySigVerificationState { + verification_status: EntryVerificationStatus::Success, + entries: Some(entries), + device_verification_data: DeviceSigVerificationData::Cpu(), + gpu_verify_duration_us: 0, + }) +} + +fn start_verify_transactions_gpu( + entries: Vec, + verify_recyclers: VerifyRecyclers, + verify: Arc< + dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + + Send + + Sync, + >, +) -> Result { let verify_func = { move |versioned_tx: VersionedTransaction| -> Result { verify( @@ -468,93 +484,94 @@ pub fn start_verify_transactions( ) } }; - let entries = verify_transactions(entries, Arc::new(verify_func)); - match entries { - Ok(entries) => { - let entry_txs: Vec<&SanitizedTransaction> = entries - .iter() - .filter_map(|entry_type| match entry_type { - EntryType::Tick(_) => None, - EntryType::Transactions(transactions) => Some(transactions), - }) - .flatten() - .collect::>(); - if entry_txs.is_empty() { - return Ok(EntrySigVerificationState { - verification_status: EntryVerificationStatus::Success, - entries: Some(entries), - device_verification_data: DeviceSigVerificationData::Cpu(), - gpu_verify_duration_us: 0, - }); - } + let entries = verify_transactions(entries, Arc::new(verify_func))?; - let mut packet_batches = entry_txs - .par_iter() - .chunks(PACKETS_PER_BATCH) - .map(|slice| { - let vec_size = slice.len(); - let mut packet_batch = PacketBatch::new_with_recycler( - verify_recyclers.packet_recycler.clone(), - vec_size, - "entry-sig-verify", - ); - // We use set_len here instead of resize(vec_size, Packet::default()), to save - // memory bandwidth and avoid writing a large amount of data that will be overwritten - // soon afterwards. As well, Packet::default() actually leaves the packet data - // uninitialized, so the initialization would simply write junk into - // the vector anyway. - unsafe { - packet_batch.set_len(vec_size); - } - let entry_tx_iter = slice - .into_par_iter() - .map(|tx| tx.to_versioned_transaction()); + let entry_txs: Vec<&SanitizedTransaction> = entries + .iter() + .filter_map(|entry_type| match entry_type { + EntryType::Tick(_) => None, + EntryType::Transactions(transactions) => Some(transactions), + }) + .flatten() + .collect::>(); - let res = packet_batch.par_iter_mut().zip(entry_tx_iter).all(|pair| { - *pair.0.meta_mut() = Meta::default(); - Packet::populate_packet(pair.0, None, &pair.1).is_ok() - }); - if res { - Ok(packet_batch) - } else { - Err(TransactionError::SanitizeFailure) - } - }) - .collect::>>()?; - - let tx_offset_recycler = verify_recyclers.tx_offset_recycler; - let out_recycler = verify_recyclers.out_recycler; - let num_packets = entry_txs.len(); - let gpu_verify_thread = thread::Builder::new() - .name("solGpuSigVerify".into()) - .spawn(move || { - let mut verify_time = Measure::start("sigverify"); - sigverify::ed25519_verify( - &mut packet_batches, - &tx_offset_recycler, - &out_recycler, - false, - num_packets, - ); - let verified = packet_batches - .iter() - .all(|batch| batch.iter().all(|p| !p.meta().discard())); - verify_time.stop(); - (verified, verify_time.as_us()) - }) - .unwrap(); - Ok(EntrySigVerificationState { - verification_status: EntryVerificationStatus::Pending, - entries: Some(entries), - device_verification_data: DeviceSigVerificationData::Gpu(GpuSigVerificationData { - thread_h: Some(gpu_verify_thread), - }), - gpu_verify_duration_us: 0, - }) - } - Err(err) => Err(err), + if entry_txs.is_empty() { + return Ok(EntrySigVerificationState { + verification_status: EntryVerificationStatus::Success, + entries: Some(entries), + device_verification_data: DeviceSigVerificationData::Cpu(), + gpu_verify_duration_us: 0, + }); } + + let mut packet_batches = entry_txs + .par_iter() + .chunks(PACKETS_PER_BATCH) + .map(|slice| { + let vec_size = slice.len(); + let mut packet_batch = PacketBatch::new_with_recycler( + verify_recyclers.packet_recycler.clone(), + vec_size, + "entry-sig-verify", + ); + // We use set_len here instead of resize(vec_size, Packet::default()), to save + // memory bandwidth and avoid writing a large amount of data that will be overwritten + // soon afterwards. As well, Packet::default() actually leaves the packet data + // uninitialized, so the initialization would simply write junk into + // the vector anyway. + unsafe { + packet_batch.set_len(vec_size); + } + let entry_tx_iter = slice + .into_par_iter() + .map(|tx| tx.to_versioned_transaction()); + + let res = packet_batch + .par_iter_mut() + .zip(entry_tx_iter) + .all(|(packet, tx)| { + *packet.meta_mut() = Meta::default(); + Packet::populate_packet(packet, None, &tx).is_ok() + }); + if res { + Ok(packet_batch) + } else { + Err(TransactionError::SanitizeFailure) + } + }) + .collect::>>()?; + + let tx_offset_recycler = verify_recyclers.tx_offset_recycler; + let out_recycler = verify_recyclers.out_recycler; + let num_packets = entry_txs.len(); + let gpu_verify_thread = thread::Builder::new() + .name("solGpuSigVerify".into()) + .spawn(move || { + let mut verify_time = Measure::start("sigverify"); + sigverify::ed25519_verify( + &mut packet_batches, + &tx_offset_recycler, + &out_recycler, + false, + num_packets, + ); + let verified = packet_batches + .iter() + .all(|batch| batch.iter().all(|p| !p.meta().discard())); + verify_time.stop(); + (verified, verify_time.as_us()) + }) + .unwrap(); + + Ok(EntrySigVerificationState { + verification_status: EntryVerificationStatus::Pending, + entries: Some(entries), + device_verification_data: DeviceSigVerificationData::Gpu(GpuSigVerificationData { + thread_h: Some(gpu_verify_thread), + }), + gpu_verify_duration_us: 0, + }) } fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool {