From fbea9d8621076e5a0d0f3ddf83a575d4b48bbdd4 Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 27 Jun 2019 09:32:32 +0200 Subject: [PATCH] Page-pin packet memory for cuda (#4250) * Page-pin packet memory for cuda Bring back recyclers and pin offset buffers * Add packet recycler to streamer * Add set_pinnable to sigverify vecs to pin them * Add packets reset test * Add test for recycler and reduce the gc lock critical section * Add comments/tests to cuda_runtime * Add recycler to recv_blobs path. * Add trace/names for debug and PacketsRecycler to bench-streamer * Predict realloc and unpin beforehand. * Add helper to reserve and pin * Cap buffered packets length * Call cuda wrapper functions --- bench-streamer/src/main.rs | 12 +- core/benches/sigverify.rs | 5 +- core/src/banking_stage.rs | 3 + core/src/cuda_runtime.rs | 297 ++++++++++++++++++++++++++++++++++++ core/src/fetch_stage.rs | 14 +- core/src/lib.rs | 2 + core/src/packet.rs | 74 +++++++-- core/src/recycler.rs | 111 ++++++++++++++ core/src/replicator.rs | 10 +- core/src/sigverify.rs | 61 ++++++-- core/src/sigverify_stage.rs | 49 ++++-- core/src/streamer.rs | 29 ++-- fetch-perf-libs.sh | 2 +- 13 files changed, 613 insertions(+), 56 deletions(-) create mode 100644 core/src/cuda_runtime.rs create mode 100644 core/src/recycler.rs diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 96273bac4..343ea5a43 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,4 +1,5 @@ use clap::{crate_description, crate_name, crate_version, App, Arg}; +use solana::packet::PacketsRecycler; use solana::packet::{Packet, Packets, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::result::Result; use solana::streamer::{receiver, PacketReceiver}; @@ -16,7 +17,7 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut msgs = Packets::default(); msgs.packets.resize(10, Packet::default()); - for w in &mut msgs.packets { + for w in msgs.packets.iter_mut() { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } @@ -74,6 +75,7 @@ fn main() -> Result<()> { let mut read_channels = Vec::new(); let mut read_threads = Vec::new(); + let recycler = PacketsRecycler::default(); for _ in 0..num_sockets { let read = solana_netutil::bind_to(port, false).unwrap(); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); @@ -83,7 +85,13 @@ fn main() -> Result<()> { let (s_reader, r_reader) = channel(); read_channels.push(r_reader); - read_threads.push(receiver(Arc::new(read), &exit, s_reader)); + read_threads.push(receiver( + Arc::new(read), + &exit, + s_reader, + recycler.clone(), + "bench-streamer-test", + )); } let t_producer1 = producer(&addr, exit.clone()); diff --git a/core/benches/sigverify.rs b/core/benches/sigverify.rs index 72d580a79..ccf8b5bf3 100644 --- a/core/benches/sigverify.rs +++ b/core/benches/sigverify.rs @@ -3,6 +3,7 @@ extern crate test; use solana::packet::to_packets; +use solana::recycler::Recycler; use solana::sigverify; use solana::test_tx::test_tx; use test::Bencher; @@ -14,8 +15,10 @@ fn bench_sigverify(bencher: &mut Bencher) { // generate packet vector let batches = to_packets(&vec![tx; 128]); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&batches); + let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); }) } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 11f4461f4..c953c6c40 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -818,6 +818,9 @@ impl BankingStage { packet_indexes: Vec, ) { if !packet_indexes.is_empty() { + if unprocessed_packets.len() > 400 { + unprocessed_packets.remove(0); + } unprocessed_packets.push((packets, packet_indexes)); } } diff --git a/core/src/cuda_runtime.rs b/core/src/cuda_runtime.rs new file mode 100644 index 000000000..929468eb1 --- /dev/null +++ b/core/src/cuda_runtime.rs @@ -0,0 +1,297 @@ +// Module for cuda-related helper functions and wrappers. +// +// cudaHostRegister/cudaHostUnregister - +// apis for page-pinning memory. Cuda driver/hardware cannot overlap +// copies from host memory to GPU memory unless the memory is page-pinned and +// cannot be paged to disk. The cuda driver provides these interfaces to pin and unpin memory. + +use crate::recycler::Reset; +#[cfg(feature = "cuda")] +use crate::sigverify::{cuda_host_register, cuda_host_unregister}; +use std::ops::{Deref, DerefMut}; + +#[cfg(feature = "cuda")] +use std::mem::size_of; + +#[cfg(feature = "cuda")] +use core::ffi::c_void; + +#[cfg(feature = "cuda")] +use std::os::raw::c_int; + +#[cfg(feature = "cuda")] +const CUDA_SUCCESS: c_int = 0; + +pub fn pin(_mem: &mut Vec) { + #[cfg(feature = "cuda")] + unsafe { + let err = cuda_host_register( + _mem.as_mut_ptr() as *mut c_void, + _mem.capacity() * size_of::(), + 0, + ); + if err != CUDA_SUCCESS { + error!( + "cudaHostRegister error: {} ptr: {:?} bytes: {}", + err, + _mem.as_ptr(), + _mem.capacity() * size_of::() + ); + } + } +} + +pub fn unpin(_mem: *mut T) { + #[cfg(feature = "cuda")] + unsafe { + let err = cuda_host_unregister(_mem as *mut c_void); + if err != CUDA_SUCCESS { + error!("cudaHostUnregister returned: {} ptr: {:?}", err, _mem); + } + } +} + +// A vector wrapper where the underlying memory can be +// page-pinned. Controlled by flags in case user only wants +// to pin in certain circumstances. +#[derive(Debug)] +pub struct PinnedVec { + x: Vec, + pinned: bool, + pinnable: bool, +} + +impl Reset for PinnedVec { + fn reset(&mut self) { + self.resize(0, 0u8); + } +} + +impl Reset for PinnedVec { + fn reset(&mut self) { + self.resize(0, 0u32); + } +} + +impl Default for PinnedVec { + fn default() -> Self { + Self { + x: Vec::new(), + pinned: false, + pinnable: false, + } + } +} + +impl Deref for PinnedVec { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.x + } +} + +impl DerefMut for PinnedVec { + fn deref_mut(&mut self) -> &mut Vec { + &mut self.x + } +} + +pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>); + +pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>); + +impl<'a, T> Iterator for PinnedIter<'a, T> { + type Item = &'a T; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl<'a, T> Iterator for PinnedIterMut<'a, T> { + type Item = &'a mut T; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +impl<'a, T> IntoIterator for &'a mut PinnedVec { + type Item = &'a T; + type IntoIter = PinnedIter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + PinnedIter(self.iter()) + } +} + +impl<'a, T> IntoIterator for &'a PinnedVec { + type Item = &'a T; + type IntoIter = PinnedIter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + PinnedIter(self.iter()) + } +} + +impl PinnedVec { + pub fn reserve_and_pin(&mut self, size: usize) { + if self.x.capacity() < size { + if self.pinned { + unpin(&mut self.x); + self.pinned = false; + } + self.x.reserve(size); + } + self.set_pinnable(); + if !self.pinned { + pin(&mut self.x); + self.pinned = true; + } + } + + pub fn set_pinnable(&mut self) { + self.pinnable = true; + } + + pub fn from_vec(source: Vec) -> Self { + Self { + x: source, + pinned: false, + pinnable: false, + } + } + + pub fn with_capacity(capacity: usize) -> Self { + let x = Vec::with_capacity(capacity); + Self { + x, + pinned: false, + pinnable: false, + } + } + + pub fn iter(&self) -> PinnedIter { + PinnedIter(self.x.iter()) + } + + pub fn iter_mut(&mut self) -> PinnedIterMut { + PinnedIterMut(self.x.iter_mut()) + } + + pub fn is_empty(&self) -> bool { + self.x.is_empty() + } + + pub fn len(&self) -> usize { + self.x.len() + } + + #[cfg(feature = "cuda")] + pub fn as_ptr(&self) -> *const T { + self.x.as_ptr() + } + + #[cfg(feature = "cuda")] + pub fn as_mut_ptr(&mut self) -> *mut T { + self.x.as_mut_ptr() + } + + pub fn push(&mut self, x: T) { + let old_ptr = self.x.as_mut_ptr(); + let old_capacity = self.x.capacity(); + // Predict realloc and unpin + if self.pinned && self.x.capacity() == self.x.len() { + unpin(old_ptr); + self.pinned = false; + } + self.x.push(x); + self.check_ptr(old_ptr, old_capacity, "push"); + } + + pub fn resize(&mut self, size: usize, elem: T) { + let old_ptr = self.x.as_mut_ptr(); + let old_capacity = self.x.capacity(); + // Predict realloc and unpin. + if self.pinned && self.x.capacity() < size { + unpin(old_ptr); + self.pinned = false; + } + self.x.resize(size, elem); + self.check_ptr(old_ptr, old_capacity, "resize"); + } + + fn check_ptr(&mut self, _old_ptr: *mut T, _old_capacity: usize, _from: &'static str) { + #[cfg(feature = "cuda")] + { + if self.pinnable && (self.x.as_ptr() != _old_ptr || self.x.capacity() != _old_capacity) + { + if self.pinned { + unpin(_old_ptr); + } + + trace!( + "pinning from check_ptr old: {} size: {} from: {}", + _old_capacity, + self.x.capacity(), + _from + ); + pin(&mut self.x); + self.pinned = true; + } + } + } +} + +impl Clone for PinnedVec { + fn clone(&self) -> Self { + let mut x = self.x.clone(); + let pinned = if self.pinned { + pin(&mut x); + true + } else { + false + }; + debug!( + "clone PinnedVec: size: {} pinned?: {} pinnable?: {}", + self.x.capacity(), + self.pinned, + self.pinnable + ); + Self { + x, + pinned, + pinnable: self.pinnable, + } + } +} + +impl Drop for PinnedVec { + fn drop(&mut self) { + if self.pinned { + unpin(self.x.as_mut_ptr()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pinned_vec() { + let mut mem = PinnedVec::with_capacity(10); + mem.set_pinnable(); + mem.push(50); + mem.resize(2, 10); + assert_eq!(mem[0], 50); + assert_eq!(mem[1], 10); + assert_eq!(mem.len(), 2); + assert_eq!(mem.is_empty(), false); + let mut iter = mem.iter(); + assert_eq!(*iter.next().unwrap(), 50); + assert_eq!(*iter.next().unwrap(), 10); + assert_eq!(iter.next(), None); + } +} diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 5b2736f1a..675e5906b 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -2,6 +2,7 @@ use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; use crate::poh_recorder::PohRecorder; +use crate::recycler::Recycler; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; @@ -87,9 +88,16 @@ impl FetchStage { sender: &PacketSender, poh_recorder: &Arc>, ) -> Self { - let tpu_threads = sockets - .into_iter() - .map(|socket| streamer::receiver(socket, &exit, sender.clone())); + let recycler = Recycler::default(); + let tpu_threads = sockets.into_iter().map(|socket| { + streamer::receiver( + socket, + &exit, + sender.clone(), + recycler.clone(), + "fetch_stage", + ) + }); let (forward_sender, forward_receiver) = channel(); let tpu_via_blobs_threads = tpu_via_blobs_sockets diff --git a/core/src/lib.rs b/core/src/lib.rs index 460ae5d51..23afb4593 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -13,6 +13,7 @@ pub mod chacha; #[cfg(cuda)] pub mod chacha_cuda; pub mod cluster_info_vote_listener; +pub mod recycler; #[macro_use] pub mod contact_info; pub mod crds; @@ -31,6 +32,7 @@ pub mod cluster_info; pub mod cluster_info_repair_listener; pub mod cluster_tests; pub mod consensus; +pub mod cuda_runtime; pub mod entry; pub mod erasure; pub mod fetch_stage; diff --git a/core/src/packet.rs b/core/src/packet.rs index 61b701fb8..26998ca74 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -1,5 +1,7 @@ //! The `packet` module defines data structures and methods to pull data from the network. +use crate::cuda_runtime::PinnedVec; use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; +use crate::recycler::{Recycler, Reset}; use crate::result::{Error, Result}; use bincode; use byteorder::{ByteOrder, LittleEndian}; @@ -16,6 +18,7 @@ use std::fmt; use std::io; use std::io::Cursor; use std::io::Write; +use std::mem; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::ops::{Deref, DerefMut}; @@ -124,21 +127,61 @@ impl Meta { #[derive(Debug, Clone)] pub struct Packets { - pub packets: Vec, + pub packets: PinnedVec, + + recycler: Option, +} + +impl Drop for Packets { + fn drop(&mut self) { + if let Some(ref recycler) = self.recycler { + let old = mem::replace(&mut self.packets, PinnedVec::default()); + recycler.recycle(old) + } + } +} + +impl Reset for Packets { + fn reset(&mut self) { + self.packets.resize(0, Packet::default()); + } +} + +impl Reset for PinnedVec { + fn reset(&mut self) { + self.resize(0, Packet::default()); + } } //auto derive doesn't support large arrays impl Default for Packets { fn default() -> Packets { + let packets = PinnedVec::with_capacity(NUM_RCVMMSGS); Packets { - packets: Vec::with_capacity(NUM_RCVMMSGS), + packets, + recycler: None, } } } +pub type PacketsRecycler = Recycler>; + impl Packets { pub fn new(packets: Vec) -> Self { - Self { packets } + let packets = PinnedVec::from_vec(packets); + Self { + packets, + recycler: None, + } + } + + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { + let mut packets = recycler.allocate(name); + packets.reserve_and_pin(size); + Packets { + packets, + recycler: Some(recycler), + } } pub fn set_addr(&mut self, addr: &SocketAddr) { @@ -516,9 +559,8 @@ impl Blob { } // other side of store_packets - pub fn load_packets(&self) -> Vec { + pub fn load_packets(&self, packets: &mut PinnedVec) { // rough estimate - let mut packets: Vec = Vec::with_capacity(self.size() / PACKET_DATA_SIZE); let mut pos = 0; let size_len = bincode::serialized_size(&0usize).unwrap() as usize; @@ -538,7 +580,6 @@ impl Blob { pos += size; packets.push(packet); } - packets } pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { @@ -652,7 +693,7 @@ mod tests { // test that the address is actually being updated let send_addr = socketaddr!([127, 0, 0, 1], 123); let packets = vec![Packet::default()]; - let mut msgs = Packets { packets }; + let mut msgs = Packets::new(packets); msgs.set_addr(&send_addr); assert_eq!(SocketAddr::from(msgs.packets[0].meta.addr()), send_addr); } @@ -678,7 +719,7 @@ mod tests { assert_eq!(recvd, p.packets.len()); - for m in p.packets { + for m in &p.packets { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.addr(), saddr); } @@ -810,10 +851,12 @@ mod tests { let blobs = packets_to_blobs(&packets[..]); - let reconstructed_packets: Vec = - blobs.iter().flat_map(|b| b.load_packets()).collect(); + let mut reconstructed_packets = PinnedVec::default(); + blobs + .iter() + .for_each(|b| b.load_packets(&mut reconstructed_packets)); - assert_eq!(reconstructed_packets, packets); + assert_eq!(reconstructed_packets[..], packets[..]); } #[test] @@ -862,4 +905,13 @@ mod tests { b.sign(&k); assert!(b.verify()); } + + #[test] + fn test_packets_reset() { + let mut packets = Packets::default(); + packets.packets.resize(10, Packet::default()); + assert_eq!(packets.packets.len(), 10); + packets.reset(); + assert_eq!(packets.packets.len(), 0); + } } diff --git a/core/src/recycler.rs b/core/src/recycler.rs new file mode 100644 index 000000000..1eefc5295 --- /dev/null +++ b/core/src/recycler.rs @@ -0,0 +1,111 @@ +use rand::{thread_rng, Rng}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +#[derive(Debug, Default)] +struct RecyclerStats { + total: AtomicUsize, + reuse: AtomicUsize, + max_gc: AtomicUsize, +} + +#[derive(Debug)] +pub struct Recycler { + gc: Arc>>, + stats: Arc, + id: usize, +} + +impl Default for Recycler { + fn default() -> Recycler { + let id = thread_rng().gen_range(0, 1000); + trace!("new recycler..{}", id); + Recycler { + gc: Arc::new(Mutex::new(vec![])), + stats: Arc::new(RecyclerStats::default()), + id, + } + } +} + +impl Clone for Recycler { + fn clone(&self) -> Recycler { + Recycler { + gc: self.gc.clone(), + stats: self.stats.clone(), + id: self.id, + } + } +} + +pub trait Reset { + fn reset(&mut self); +} + +impl Recycler { + pub fn allocate(&self, name: &'static str) -> T { + let new = self + .gc + .lock() + .expect("recycler lock in pb fn allocate") + .pop(); + + if let Some(mut x) = new { + self.stats.reuse.fetch_add(1, Ordering::Relaxed); + x.reset(); + return x; + } + + trace!( + "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", + self.stats.total.fetch_add(1, Ordering::Relaxed), + name, + self.id, + self.stats.reuse.load(Ordering::Relaxed), + self.stats.max_gc.load(Ordering::Relaxed), + ); + + T::default() + } + + pub fn recycle(&self, x: T) { + let len = { + let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); + gc.push(x); + gc.len() + }; + + let max_gc = self.stats.max_gc.load(Ordering::Relaxed); + if len > max_gc { + // this is not completely accurate, but for most cases should be fine. + self.stats + .max_gc + .compare_and_swap(max_gc, len, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + impl Reset for u64 { + fn reset(&mut self) { + *self = 10; + } + } + + #[test] + fn test_recycler() { + let recycler = Recycler::default(); + let mut y: u64 = recycler.allocate("test_recycler1"); + assert_eq!(y, 0); + y = 20; + let recycler2 = recycler.clone(); + recycler2.recycle(y); + assert_eq!(recycler.gc.lock().unwrap().len(), 1); + let z = recycler.allocate("test_recycler2"); + assert_eq!(z, 10); + assert_eq!(recycler.gc.lock().unwrap().len(), 0); + } +} diff --git a/core/src/replicator.rs b/core/src/replicator.rs index c827708d2..4a1b0be69 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -5,6 +5,7 @@ use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::gossip_service::GossipService; use crate::packet::to_shared_blob; +use crate::recycler::Recycler; use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; @@ -121,7 +122,14 @@ fn create_request_processor( let (s_reader, r_reader) = channel(); let (s_responder, r_responder) = channel(); let storage_socket = Arc::new(socket); - let t_receiver = receiver(storage_socket.clone(), exit, s_reader); + let recycler = Recycler::default(); + let t_receiver = receiver( + storage_socket.clone(), + exit, + s_reader, + recycler, + "replicator", + ); thread_handles.push(t_receiver); let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder); diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 4a853e660..6a7a997ba 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -4,7 +4,9 @@ //! offloaded to the GPU. //! +use crate::cuda_runtime::PinnedVec; use crate::packet::{Packet, Packets}; +use crate::recycler::Recycler; use crate::result::Result; use bincode::serialized_size; use rayon::ThreadPool; @@ -18,7 +20,10 @@ use solana_sdk::transaction::Transaction; use std::mem::size_of; #[cfg(feature = "cuda")] -use std::os::raw::c_int; +use std::os::raw::{c_int, c_uint}; + +#[cfg(feature = "cuda")] +use core::ffi::c_void; pub const NUM_THREADS: u32 = 10; use std::cell::RefCell; @@ -28,7 +33,9 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: .build() .unwrap())); -type TxOffsets = (Vec, Vec, Vec, Vec, Vec>); +pub type TxOffset = PinnedVec; + +type TxOffsets = (TxOffset, TxOffset, TxOffset, TxOffset, Vec>); #[cfg(feature = "cuda")] #[repr(C)] @@ -78,6 +85,9 @@ extern "C" { num_elems: usize, use_non_default_stream: u8, ) -> c_int; + + pub fn cuda_host_register(ptr: *mut c_void, size: usize, flags: c_uint) -> c_int; + pub fn cuda_host_unregister(ptr: *mut c_void) -> c_int; } #[cfg(not(feature = "cuda"))] @@ -122,7 +132,11 @@ fn batch_size(batches: &[Packets]) -> usize { } #[cfg(not(feature = "cuda"))] -pub fn ed25519_verify(batches: &[Packets]) -> Vec> { +pub fn ed25519_verify( + batches: &[Packets], + _recycler: &Recycler, + _recycler_out: &Recycler>, +) -> Vec> { ed25519_verify_cpu(batches) } @@ -145,11 +159,16 @@ pub fn get_packet_offsets(packet: &Packet, current_offset: u32) -> (u32, u32, u3 ) } -pub fn generate_offsets(batches: &[Packets]) -> Result { - let mut signature_offsets: Vec<_> = Vec::new(); - let mut pubkey_offsets: Vec<_> = Vec::new(); - let mut msg_start_offsets: Vec<_> = Vec::new(); - let mut msg_sizes: Vec<_> = Vec::new(); +pub fn generate_offsets(batches: &[Packets], recycler: &Recycler) -> Result { + debug!("allocating.."); + let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets"); + signature_offsets.set_pinnable(); + let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets"); + pubkey_offsets.set_pinnable(); + let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets"); + msg_start_offsets.set_pinnable(); + let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); + msg_sizes.set_pinnable(); let mut current_packet = 0; let mut v_sig_lens = Vec::new(); batches.iter().for_each(|p| { @@ -229,7 +248,11 @@ pub fn init() { } #[cfg(feature = "cuda")] -pub fn ed25519_verify(batches: &[Packets]) -> Vec> { +pub fn ed25519_verify( + batches: &[Packets], + recycler: &Recycler, + recycler_out: &Recycler>, +) -> Vec> { use crate::packet::PACKET_DATA_SIZE; let count = batch_size(batches); @@ -243,10 +266,12 @@ pub fn ed25519_verify(batches: &[Packets]) -> Vec> { } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = - generate_offsets(batches).unwrap(); + generate_offsets(batches, recycler).unwrap(); debug!("CUDA ECDSA for {}", batch_size(batches)); - let mut out = Vec::new(); + debug!("allocating out.."); + let mut out = recycler_out.allocate("out_buffer"); + out.set_pinnable(); let mut elems = Vec::new(); let mut rvs = Vec::new(); @@ -303,6 +328,11 @@ pub fn ed25519_verify(batches: &[Packets]) -> Vec> { } } inc_new_counter_debug!("ed25519_verify_gpu", count); + recycler_out.recycle(out); + recycler.recycle(signature_offsets); + recycler.recycle(pubkey_offsets); + recycler.recycle(msg_sizes); + recycler.recycle(msg_start_offsets); rvs } @@ -320,6 +350,7 @@ pub fn make_packet_from_transaction(tx: Transaction) -> Packet { #[cfg(test)] mod tests { use crate::packet::{Packet, Packets}; + use crate::recycler::Recycler; use crate::sigverify; use crate::test_tx::{test_multisig_tx, test_tx}; use bincode::{deserialize, serialize}; @@ -461,8 +492,10 @@ mod tests { let batches = generate_packet_vec(&packet, n, 2); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets - let ans = sigverify::ed25519_verify(&batches); + let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); // check result let ref_ans = if modify_data { 0u8 } else { 1u8 }; @@ -499,8 +532,10 @@ mod tests { batches[0].packets.push(packet); + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); // verify packets - let ans = sigverify::ed25519_verify(&batches); + let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out); // check result let ref_ans = 1u8; diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index a83075dc8..e9e72cf90 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -5,10 +5,13 @@ //! transaction. All processing is done on the CPU by default and on a GPU //! if the `cuda` feature is enabled with `--features=cuda`. +use crate::cuda_runtime::PinnedVec; use crate::packet::Packets; +use crate::recycler::Recycler; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify; +use crate::sigverify::TxOffset; use crate::streamer::{self, PacketReceiver}; use crossbeam_channel::Sender as CrossbeamSender; use solana_metrics::{datapoint_info, inc_new_counter_info}; @@ -19,7 +22,7 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; #[cfg(feature = "cuda")] -const RECV_BATCH_MAX: usize = 60_000; +const RECV_BATCH_MAX: usize = 5_000; #[cfg(not(feature = "cuda"))] const RECV_BATCH_MAX: usize = 1000; @@ -43,11 +46,16 @@ impl SigVerifyStage { Self { thread_hdls } } - fn verify_batch(batch: Vec, sigverify_disabled: bool) -> VerifiedPackets { + fn verify_batch( + batch: Vec, + sigverify_disabled: bool, + recycler: &Recycler, + recycler_out: &Recycler>, + ) -> VerifiedPackets { let r = if sigverify_disabled { sigverify::ed25519_verify_disabled(&batch) } else { - sigverify::ed25519_verify(&batch) + sigverify::ed25519_verify(&batch, recycler, recycler_out) }; batch.into_iter().zip(r).collect() } @@ -57,6 +65,8 @@ impl SigVerifyStage { sendr: &CrossbeamSender, sigverify_disabled: bool, id: usize, + recycler: &Recycler, + recycler_out: &Recycler>, ) -> Result<()> { let (batch, len, recv_time) = streamer::recv_batch( &recvr.lock().expect("'recvr' lock in fn verifier"), @@ -69,11 +79,11 @@ impl SigVerifyStage { debug!( "@{:?} verifier: verifying: {} id: {}", timing::timestamp(), - batch.len(), + len, id ); - let verified_batch = Self::verify_batch(batch, sigverify_disabled); + let verified_batch = Self::verify_batch(batch, sigverify_disabled, recycler, recycler_out); inc_new_counter_info!("sigverify_stage-verified_packets_send", len); if sendr.send(verified_batch).is_err() { @@ -114,17 +124,26 @@ impl SigVerifyStage { ) -> JoinHandle<()> { Builder::new() .name(format!("solana-verifier-{}", id)) - .spawn(move || loop { - if let Err(e) = - Self::verifier(&packet_receiver, &verified_sender, sigverify_disabled, id) - { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => { - break; + .spawn(move || { + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + loop { + if let Err(e) = Self::verifier( + &packet_receiver, + &verified_sender, + sigverify_disabled, + id, + &recycler, + &recycler_out, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => { + break; + } + _ => error!("{:?}", e), } - _ => error!("{:?}", e), } } }) diff --git a/core/src/streamer.rs b/core/src/streamer.rs index b56553400..9663e2af1 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,7 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, Packets, SharedBlobs}; +use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs}; use crate::result::{Error, Result}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -16,9 +16,15 @@ pub type PacketSender = Sender; pub type BlobSender = Sender; pub type BlobReceiver = Receiver; -fn recv_loop(sock: &UdpSocket, exit: Arc, channel: &PacketSender) -> Result<()> { +fn recv_loop( + sock: &UdpSocket, + exit: Arc, + channel: &PacketSender, + recycler: &PacketsRecycler, + name: &'static str, +) -> Result<()> { loop { - let mut msgs = Packets::default(); + let mut msgs = Packets::new_with_recycler(recycler.clone(), 256, name); loop { // Check for exit signal, even if socket is busy // (for instance the leader trasaction socket) @@ -37,6 +43,8 @@ pub fn receiver( sock: Arc, exit: &Arc, packet_sender: PacketSender, + recycler: PacketsRecycler, + name: &'static str, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); if res.is_err() { @@ -46,7 +54,7 @@ pub fn receiver( Builder::new() .name("solana-receiver".to_string()) .spawn(move || { - let _ = recv_loop(&sock, exit, &packet_sender); + let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name); }) .unwrap() } @@ -126,7 +134,7 @@ pub fn blob_receiver( .unwrap() } -fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { +fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender, recycler: &PacketsRecycler) -> Result<()> { trace!( "recv_blob_packets: receiving on {}", sock.local_addr().unwrap() @@ -134,8 +142,9 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { let blobs = Blob::recv_from(sock)?; for blob in blobs { - let packets = blob.read().unwrap().load_packets(); - s.send(Packets::new(packets))?; + let mut packets = Packets::new_with_recycler(recycler.clone(), 256, "recv_blob_packets"); + blob.read().unwrap().load_packets(&mut packets.packets); + s.send(packets)?; } Ok(()) @@ -152,13 +161,14 @@ pub fn blob_packet_receiver( sock.set_read_timeout(Some(timer)) .expect("set socket timeout"); let exit = exit.clone(); + let recycler = PacketsRecycler::default(); Builder::new() .name("solana-blob_packet_receiver".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; } - let _ = recv_blob_packets(&sock, &s); + let _ = recv_blob_packets(&sock, &s, &recycler); }) .unwrap() } @@ -167,6 +177,7 @@ pub fn blob_packet_receiver( mod test { use super::*; use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; + use crate::recycler::Recycler; use crate::streamer::{receiver, responder}; use std::io; use std::io::Write; @@ -207,7 +218,7 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), &exit, s_reader); + let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::default(), "test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/fetch-perf-libs.sh b/fetch-perf-libs.sh index 868f7acac..bc527e530 100755 --- a/fetch-perf-libs.sh +++ b/fetch-perf-libs.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -PERF_LIBS_VERSION=v0.14.1 +PERF_LIBS_VERSION=v0.15.0 set -e cd "$(dirname "$0")"