diff --git a/Cargo.lock b/Cargo.lock index e18aca859..3676a0881 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3759,6 +3759,7 @@ dependencies = [ "bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "dlopen_derive 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/archiver.rs b/core/src/archiver.rs index 64786ea23..cee05b402 100644 --- a/core/src/archiver.rs +++ b/core/src/archiver.rs @@ -921,7 +921,7 @@ impl Archiver { let res = r_reader.recv_timeout(Duration::new(1, 0)); if let Ok(mut packets) = res { while let Ok(mut more) = r_reader.try_recv() { - packets.packets.append(&mut more.packets); + packets.packets.append_pinned(&mut more.packets); } let shreds: Vec = packets .packets diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index be9b15488..1b19b0cfa 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -16,6 +16,7 @@ use solana_ledger::{ }; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; +use solana_perf::cuda_runtime::PinnedVec; use solana_perf::perf_libs; use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch}; use solana_sdk::{ @@ -789,7 +790,7 @@ impl BankingStage { filtered_unprocessed_packet_indexes } - fn generate_packet_indexes(vers: &[Packet]) -> Vec { + fn generate_packet_indexes(vers: &PinnedVec) -> Vec { vers.iter() .enumerate() .filter_map( diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 6b7d9f936..45a7dfabd 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,6 +1,7 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; +use crate::packet::PacketsRecycler; use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::service::Service; @@ -92,7 +93,8 @@ impl FetchStage { sender: &PacketSender, poh_recorder: &Arc>, ) -> Self { - let recycler = Recycler::default(); + let recycler: PacketsRecycler = Recycler::warmed(1000, 1024); + let tpu_threads = sockets.into_iter().map(|socket| { streamer::receiver( socket, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 8f30764b3..9e79e2bb7 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,6 +1,6 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. -use crate::packet::Packet; +use crate::packet::{Packet, PacketsRecycler}; use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; use solana_perf::cuda_runtime::PinnedVec; @@ -67,7 +67,8 @@ impl ShredFetchStage { sender: &PacketSender, exit: &Arc, ) -> Self { - let recycler = Recycler::default(); + let recycler: PacketsRecycler = Recycler::warmed(100, 1024); + let tvu_threads = sockets.into_iter().map(|socket| { streamer::receiver( socket, diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 75446fc47..1c9b2bd39 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier { fn default() -> Self { init(); Self { - recycler: Recycler::default(), - recycler_out: Recycler::default(), + recycler: Recycler::warmed(50, 4096), + recycler_out: Recycler::warmed(50, 4096), } } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index ce1469bf6..73f8d2b4f 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -30,8 +30,8 @@ impl ShredSigVerifier { Self { bank_forks, leader_schedule_cache, - recycler_offsets: Recycler::default(), - recycler_out: Recycler::default(), + recycler_offsets: Recycler::warmed(50, 4096), + recycler_out: Recycler::warmed(50, 4096), } } fn read_slots(batches: &[Packets]) -> HashSet { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 04726a111..733db8a55 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -287,7 +287,7 @@ mod test { service::Service, }; use crossbeam_channel::unbounded; - use rand::{seq::SliceRandom, thread_rng}; + use rand::thread_rng; use solana_ledger::shred::DataShredHeader; use solana_ledger::{ blocktree::{get_tmp_ledger_path, make_many_slot_entries, Blocktree}, diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 7bfb896a6..4a7b19e8e 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -127,14 +127,22 @@ fn slot_key_data_for_gpu< } } let mut keyvec = recycler_keys.allocate("shred_gpu_pubkeys"); + keyvec.set_pinnable(); let mut slot_to_key_ix = HashMap::new(); - for (i, (k, slots)) in keys_to_slots.iter().enumerate() { - keyvec.extend(k.as_ref()); + + let keyvec_size = keys_to_slots.len() * size_of::(); + keyvec.resize(keyvec_size, 0); + + for (i, (k, slots)) in keys_to_slots.iter_mut().enumerate() { + let start = i * size_of::(); + let end = start + size_of::(); + keyvec[start..end].copy_from_slice(k.as_ref()); for s in slots { slot_to_key_ix.insert(s, i); } } let mut offsets = recycler_offsets.allocate("shred_offsets"); + offsets.set_pinnable(); slots.iter().for_each(|packet_slots| { packet_slots.iter().for_each(|slot| { offsets @@ -145,18 +153,10 @@ fn slot_key_data_for_gpu< //TODO: GPU needs a more opaque interface, which can handle variable sized structures for data //Pad the Pubkeys buffer such that it is bigger than a buffer of Packet sized elems let num_in_packets = (keyvec.len() + (size_of::() - 1)) / size_of::(); - trace!("num_in_packets {}", num_in_packets); - //number of bytes missing - let missing = num_in_packets * size_of::() - keyvec.len(); - trace!("missing {}", missing); - //extra Pubkeys needed to fill the buffer - let extra = (missing + size_of::() - 1) / size_of::(); - trace!("extra {}", extra); - trace!("keyvec {}", keyvec.len()); - keyvec.resize(keyvec.len() + extra, 0u8); - trace!("keyvec {}", keyvec.len()); - trace!("keyvec {:?}", keyvec); - trace!("offsets {:?}", offsets); + keyvec.resize(num_in_packets * size_of::(), 0u8); + trace!("keyvec.len: {}", keyvec.len()); + trace!("keyvec: {:?}", keyvec); + trace!("offsets: {:?}", offsets); (keyvec, offsets, num_in_packets) } @@ -166,8 +166,11 @@ fn shred_gpu_offsets( recycler_offsets: &Recycler, ) -> (TxOffset, TxOffset, TxOffset, Vec>) { let mut signature_offsets = recycler_offsets.allocate("shred_signatures"); + signature_offsets.set_pinnable(); let mut msg_start_offsets = recycler_offsets.allocate("shred_msg_starts"); + msg_start_offsets.set_pinnable(); let mut msg_sizes = recycler_offsets.allocate("shred_msg_sizes"); + msg_sizes.set_pinnable(); let mut v_sig_lens = vec![]; for batch in batches { let mut sig_lens = Vec::new(); @@ -402,6 +405,7 @@ pub fn sign_shreds_gpu( shred_gpu_offsets(offset, batches, recycler_offsets); let total_sigs = signature_offsets.len(); let mut signatures_out = recycler_out.allocate("ed25519 signatures"); + signatures_out.set_pinnable(); signatures_out.resize(total_sigs * sig_size, 0); elems.push( perf_libs::Elems { diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 2b8318f34..fa654acd4 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -16,6 +16,7 @@ rayon = "1.2.0" serde = "1.0.102" serde_derive = "1.0.102" dlopen_derive = "0.1.4" +lazy_static = "1.4.0" log = "0.4.8" solana-sdk = { path = "../sdk", version = "0.21.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.21.0" } diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index 5f558ef3d..38c07570e 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -7,51 +7,47 @@ use crate::perf_libs; use crate::recycler::Reset; -use std::ops::{Deref, DerefMut}; +use rand::seq::SliceRandom; +use rand::Rng; +use rayon::prelude::*; +use std::ops::{Index, IndexMut}; +use std::slice::SliceIndex; -#[cfg(feature = "pin_gpu_memory")] use std::os::raw::c_int; -#[cfg(feature = "pin_gpu_memory")] const CUDA_SUCCESS: c_int = 0; pub fn pin(_mem: &mut Vec) { - #[cfg(feature = "pin_gpu_memory")] - { - if let Some(api) = perf_libs::api() { - unsafe { - use core::ffi::c_void; - use std::mem::size_of; + if let Some(api) = perf_libs::api() { + unsafe { + use core::ffi::c_void; + use std::mem::size_of; - let err = (api.cuda_host_register)( - _mem.as_mut_ptr() as *mut c_void, - _mem.capacity() * size_of::(), - 0, + let err = (api.cuda_host_register)( + _mem.as_mut_ptr() as *mut c_void, + _mem.capacity() * size_of::(), + 0, + ); + if err != CUDA_SUCCESS { + panic!( + "cudaHostRegister error: {} ptr: {:?} bytes: {}", + err, + _mem.as_ptr(), + _mem.capacity() * size_of::() ); - if err != CUDA_SUCCESS { - error!( - "cudaHostRegister error: {} ptr: {:?} bytes: {}", - err, - _mem.as_ptr(), - _mem.capacity() * size_of::() - ); - } } } } } pub fn unpin(_mem: *mut T) { - #[cfg(feature = "pin_gpu_memory")] - { - if let Some(api) = perf_libs::api() { - unsafe { - use core::ffi::c_void; + if let Some(api) = perf_libs::api() { + unsafe { + use core::ffi::c_void; - let err = (api.cuda_host_unregister)(_mem as *mut c_void); - if err != CUDA_SUCCESS { - error!("cudaHostUnregister returned: {} ptr: {:?}", err, _mem); - } + let err = (api.cuda_host_unregister)(_mem as *mut c_void); + if err != CUDA_SUCCESS { + panic!("cudaHostUnregister returned: {} ptr: {:?}", err, _mem); } } } @@ -71,6 +67,11 @@ impl Reset for PinnedVec { fn reset(&mut self) { self.resize(0, T::default()); } + + fn warm(&mut self, size_hint: usize) { + self.set_pinnable(); + self.resize(size_hint, T::default()); + } } impl Default for PinnedVec { @@ -83,20 +84,6 @@ impl Default for PinnedVec { } } -impl Deref for PinnedVec { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - &self.x - } -} - -impl DerefMut for PinnedVec { - fn deref_mut(&mut self) -> &mut Vec { - &mut self.x - } -} - pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>); pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>); @@ -122,7 +109,7 @@ impl<'a, T> IntoIterator for &'a mut PinnedVec { type IntoIter = PinnedIter<'a, T>; fn into_iter(self) -> Self::IntoIter { - PinnedIter(self.iter()) + PinnedIter(self.x.iter()) } } @@ -131,7 +118,41 @@ impl<'a, T> IntoIterator for &'a PinnedVec { type IntoIter = PinnedIter<'a, T>; fn into_iter(self) -> Self::IntoIter { - PinnedIter(self.iter()) + PinnedIter(self.x.iter()) + } +} + +impl> Index for PinnedVec { + type Output = I::Output; + + #[inline] + fn index(&self, index: I) -> &Self::Output { + &self.x[index] + } +} + +impl> IndexMut for PinnedVec { + #[inline] + fn index_mut(&mut self, index: I) -> &mut Self::Output { + &mut self.x[index] + } +} + +impl PinnedVec { + pub fn iter(&self) -> PinnedIter { + PinnedIter(self.x.iter()) + } + + pub fn iter_mut(&mut self) -> PinnedIterMut { + PinnedIterMut(self.x.iter_mut()) + } +} + +impl<'a, T: Send + Sync> IntoParallelIterator for &'a PinnedVec { + type Iter = rayon::slice::Iter<'a, T>; + type Item = &'a T; + fn into_par_iter(self) -> Self::Iter { + self.x.par_iter() } } @@ -172,14 +193,6 @@ impl PinnedVec { } } - pub fn iter(&self) -> PinnedIter { - PinnedIter(self.x.iter()) - } - - pub fn iter_mut(&mut self) -> PinnedIterMut { - PinnedIterMut(self.x.iter_mut()) - } - pub fn is_empty(&self) -> bool { self.x.is_empty() } @@ -196,30 +209,49 @@ impl PinnedVec { self.x.as_mut_ptr() } - pub fn push(&mut self, x: T) { + fn prepare_realloc(&mut self, new_size: usize) -> (*mut T, usize) { let old_ptr = self.x.as_mut_ptr(); let old_capacity = self.x.capacity(); - // Predict realloc and unpin - if self.pinned && self.x.capacity() == self.x.len() { + // Predict realloc and unpin. + if self.pinned && self.x.capacity() < new_size { unpin(old_ptr); self.pinned = false; } + (old_ptr, old_capacity) + } + + pub fn push(&mut self, x: T) { + let (old_ptr, old_capacity) = self.prepare_realloc(self.x.len() + 1); self.x.push(x); self.check_ptr(old_ptr, old_capacity, "push"); } + pub fn truncate(&mut self, size: usize) { + self.x.truncate(size); + } + pub fn resize(&mut self, size: usize, elem: T) { - let old_ptr = self.x.as_mut_ptr(); - let old_capacity = self.x.capacity(); - // Predict realloc and unpin. - if self.pinned && self.x.capacity() < size { - unpin(old_ptr); - self.pinned = false; - } + let (old_ptr, old_capacity) = self.prepare_realloc(size); self.x.resize(size, elem); self.check_ptr(old_ptr, old_capacity, "resize"); } + pub fn append(&mut self, other: &mut Vec) { + let (old_ptr, old_capacity) = self.prepare_realloc(self.x.len() + other.len()); + self.x.append(other); + self.check_ptr(old_ptr, old_capacity, "resize"); + } + + pub fn append_pinned(&mut self, other: &mut Self) { + let (old_ptr, old_capacity) = self.prepare_realloc(self.x.len() + other.len()); + self.x.append(&mut other.x); + self.check_ptr(old_ptr, old_capacity, "resize"); + } + + pub fn shuffle(&mut self, rng: &mut R) { + self.x.shuffle(rng) + } + fn check_ptr(&mut self, _old_ptr: *mut T, _old_capacity: usize, _from: &'static str) { let api = perf_libs::api(); if api.is_some() diff --git a/perf/src/lib.rs b/perf/src/lib.rs index 04f2246d6..7691978d2 100644 --- a/perf/src/lib.rs +++ b/perf/src/lib.rs @@ -5,6 +5,9 @@ pub mod recycler; pub mod sigverify; pub mod test_tx; +#[macro_use] +extern crate lazy_static; + #[macro_use] extern crate log; diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 321e2de11..b2d70a795 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -33,6 +33,11 @@ impl Reset for Packets { fn reset(&mut self) { self.packets.resize(0, Packet::default()); } + + fn warm(&mut self, size_hint: usize) { + self.packets.set_pinnable(); + self.packets.resize(size_hint, Packet::default()); + } } //auto derive doesn't support large arrays diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 1eefc5295..420e12b1c 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -1,4 +1,5 @@ use rand::{thread_rng, Rng}; +use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -40,9 +41,37 @@ impl Clone for Recycler { pub trait Reset { fn reset(&mut self); + fn warm(&mut self, size_hint: usize); +} + +lazy_static! { + static ref WARM_RECYCLERS: AtomicBool = AtomicBool::new(false); +} + +pub fn enable_recycler_warming() { + WARM_RECYCLERS.store(true, Ordering::Relaxed); +} + +fn warm_recyclers() -> bool { + WARM_RECYCLERS.load(Ordering::Relaxed) } impl Recycler { + pub fn warmed(num: usize, size_hint: usize) -> Self { + let new = Self::default(); + if warm_recyclers() { + let warmed_items: Vec<_> = (0..num) + .map(|_| { + let mut item = new.allocate("warming"); + item.warm(size_hint); + item + }) + .collect(); + warmed_items.into_iter().for_each(|i| new.recycle(i)); + } + new + } + pub fn allocate(&self, name: &'static str) -> T { let new = self .gc @@ -93,6 +122,7 @@ mod tests { fn reset(&mut self) { *self = 10; } + fn warm(&mut self, _size_hint: usize) {} } #[test] diff --git a/validator/src/main.rs b/validator/src/main.rs index 216206ae7..f7b6fab1b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -12,6 +12,7 @@ use solana_core::service::Service; use solana_core::socketaddr; use solana_core::validator::{Validator, ValidatorConfig}; use solana_ledger::bank_forks::SnapshotConfig; +use solana_perf::recycler::enable_recycler_warming; use solana_sdk::clock::Slot; use solana_sdk::hash::Hash; use solana_sdk::signature::{read_keypair_file, Keypair, KeypairUtil}; @@ -574,6 +575,7 @@ pub fn main() { if cuda { solana_perf::perf_libs::init_cuda(); + enable_recycler_warming(); } let mut gossip_addr = solana_netutil::parse_port_or_addr(