entry: Split `start_verify_transactions` into `*_{cpu,gpu}` (#29962)

`start_verify_transactions` is relatively long and contains both a small part of the CPU verification and the full GPU verification code. As CPU and GPU verification are equal peers, it seems easier to follow the code when they each live in separate functions.

No functional changes. Only moving code around.
This commit is contained in:
Illia Bobyr 2023-02-13 11:52:01 -08:00 committed by GitHub
parent dd9d6e308c
commit a425cab518
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 124 additions and 107 deletions

View File

@ -432,34 +432,50 @@ pub fn start_verify_transactions(
.is_some();
if use_cpu {
let verify_func = {
let verification_mode = if skip_verification {
TransactionVerificationMode::HashOnly
} else {
TransactionVerificationMode::FullVerification
};
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
verify(versioned_tx, verification_mode)
}
start_verify_transactions_cpu(entries, skip_verification, verify)
} else {
start_verify_transactions_gpu(entries, verify_recyclers, verify)
}
}
fn start_verify_transactions_cpu(
entries: Vec<Entry>,
skip_verification: bool,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
+ Send
+ Sync,
>,
) -> Result<EntrySigVerificationState> {
let verify_func = {
let mode = if skip_verification {
TransactionVerificationMode::HashOnly
} else {
TransactionVerificationMode::FullVerification
};
let entries = verify_transactions(entries, Arc::new(verify_func));
move |versioned_tx| verify(versioned_tx, mode)
};
match entries {
Ok(entries_val) => {
return Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Success,
entries: Some(entries_val),
device_verification_data: DeviceSigVerificationData::Cpu(),
gpu_verify_duration_us: 0,
});
}
Err(err) => {
return Err(err);
}
}
}
let entries = verify_transactions(entries, Arc::new(verify_func))?;
Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Success,
entries: Some(entries),
device_verification_data: DeviceSigVerificationData::Cpu(),
gpu_verify_duration_us: 0,
})
}
fn start_verify_transactions_gpu(
entries: Vec<Entry>,
verify_recyclers: VerifyRecyclers,
verify: Arc<
dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result<SanitizedTransaction>
+ Send
+ Sync,
>,
) -> Result<EntrySigVerificationState> {
let verify_func = {
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
verify(
@ -468,93 +484,94 @@ pub fn start_verify_transactions(
)
}
};
let entries = verify_transactions(entries, Arc::new(verify_func));
match entries {
Ok(entries) => {
let entry_txs: Vec<&SanitizedTransaction> = entries
.iter()
.filter_map(|entry_type| match entry_type {
EntryType::Tick(_) => None,
EntryType::Transactions(transactions) => Some(transactions),
})
.flatten()
.collect::<Vec<_>>();
if entry_txs.is_empty() {
return Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Success,
entries: Some(entries),
device_verification_data: DeviceSigVerificationData::Cpu(),
gpu_verify_duration_us: 0,
});
}
let entries = verify_transactions(entries, Arc::new(verify_func))?;
let mut packet_batches = entry_txs
.par_iter()
.chunks(PACKETS_PER_BATCH)
.map(|slice| {
let vec_size = slice.len();
let mut packet_batch = PacketBatch::new_with_recycler(
verify_recyclers.packet_recycler.clone(),
vec_size,
"entry-sig-verify",
);
// We use set_len here instead of resize(vec_size, Packet::default()), to save
// memory bandwidth and avoid writing a large amount of data that will be overwritten
// soon afterwards. As well, Packet::default() actually leaves the packet data
// uninitialized, so the initialization would simply write junk into
// the vector anyway.
unsafe {
packet_batch.set_len(vec_size);
}
let entry_tx_iter = slice
.into_par_iter()
.map(|tx| tx.to_versioned_transaction());
let entry_txs: Vec<&SanitizedTransaction> = entries
.iter()
.filter_map(|entry_type| match entry_type {
EntryType::Tick(_) => None,
EntryType::Transactions(transactions) => Some(transactions),
})
.flatten()
.collect::<Vec<_>>();
let res = packet_batch.par_iter_mut().zip(entry_tx_iter).all(|pair| {
*pair.0.meta_mut() = Meta::default();
Packet::populate_packet(pair.0, None, &pair.1).is_ok()
});
if res {
Ok(packet_batch)
} else {
Err(TransactionError::SanitizeFailure)
}
})
.collect::<Result<Vec<_>>>()?;
let tx_offset_recycler = verify_recyclers.tx_offset_recycler;
let out_recycler = verify_recyclers.out_recycler;
let num_packets = entry_txs.len();
let gpu_verify_thread = thread::Builder::new()
.name("solGpuSigVerify".into())
.spawn(move || {
let mut verify_time = Measure::start("sigverify");
sigverify::ed25519_verify(
&mut packet_batches,
&tx_offset_recycler,
&out_recycler,
false,
num_packets,
);
let verified = packet_batches
.iter()
.all(|batch| batch.iter().all(|p| !p.meta().discard()));
verify_time.stop();
(verified, verify_time.as_us())
})
.unwrap();
Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Pending,
entries: Some(entries),
device_verification_data: DeviceSigVerificationData::Gpu(GpuSigVerificationData {
thread_h: Some(gpu_verify_thread),
}),
gpu_verify_duration_us: 0,
})
}
Err(err) => Err(err),
if entry_txs.is_empty() {
return Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Success,
entries: Some(entries),
device_verification_data: DeviceSigVerificationData::Cpu(),
gpu_verify_duration_us: 0,
});
}
let mut packet_batches = entry_txs
.par_iter()
.chunks(PACKETS_PER_BATCH)
.map(|slice| {
let vec_size = slice.len();
let mut packet_batch = PacketBatch::new_with_recycler(
verify_recyclers.packet_recycler.clone(),
vec_size,
"entry-sig-verify",
);
// We use set_len here instead of resize(vec_size, Packet::default()), to save
// memory bandwidth and avoid writing a large amount of data that will be overwritten
// soon afterwards. As well, Packet::default() actually leaves the packet data
// uninitialized, so the initialization would simply write junk into
// the vector anyway.
unsafe {
packet_batch.set_len(vec_size);
}
let entry_tx_iter = slice
.into_par_iter()
.map(|tx| tx.to_versioned_transaction());
let res = packet_batch
.par_iter_mut()
.zip(entry_tx_iter)
.all(|(packet, tx)| {
*packet.meta_mut() = Meta::default();
Packet::populate_packet(packet, None, &tx).is_ok()
});
if res {
Ok(packet_batch)
} else {
Err(TransactionError::SanitizeFailure)
}
})
.collect::<Result<Vec<_>>>()?;
let tx_offset_recycler = verify_recyclers.tx_offset_recycler;
let out_recycler = verify_recyclers.out_recycler;
let num_packets = entry_txs.len();
let gpu_verify_thread = thread::Builder::new()
.name("solGpuSigVerify".into())
.spawn(move || {
let mut verify_time = Measure::start("sigverify");
sigverify::ed25519_verify(
&mut packet_batches,
&tx_offset_recycler,
&out_recycler,
false,
num_packets,
);
let verified = packet_batches
.iter()
.all(|batch| batch.iter().all(|p| !p.meta().discard()));
verify_time.stop();
(verified, verify_time.as_us())
})
.unwrap();
Ok(EntrySigVerificationState {
verification_status: EntryVerificationStatus::Pending,
entries: Some(entries),
device_verification_data: DeviceSigVerificationData::Gpu(GpuSigVerificationData {
thread_h: Some(gpu_verify_thread),
}),
gpu_verify_duration_us: 0,
})
}
fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool {