Page-pin packet memory for cuda (#4250)

* Page-pin packet memory for cuda

Bring back recyclers and pin offset buffers

* Add packet recycler to streamer

* Add set_pinnable to sigverify vecs to pin them

* Add packets reset test

* Add test for recycler and reduce the gc lock critical section
* Add comments/tests to cuda_runtime

* Add recycler to recv_blobs path.

* Add trace/names for debug and PacketsRecycler to bench-streamer

* Predict realloc and unpin beforehand.

* Add helper to reserve and pin

* Cap buffered packets length

* Call cuda wrapper functions
This commit is contained in:
sakridge 2019-06-27 09:32:32 +02:00 committed by GitHub
parent 44a572416d
commit fbea9d8621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 613 additions and 56 deletions

View File

@ -1,4 +1,5 @@
use clap::{crate_description, crate_name, crate_version, App, Arg};
use solana::packet::PacketsRecycler;
use solana::packet::{Packet, Packets, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::result::Result;
use solana::streamer::{receiver, PacketReceiver};
@ -16,7 +17,7 @@ fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut msgs = Packets::default();
msgs.packets.resize(10, Packet::default());
for w in &mut msgs.packets {
for w in msgs.packets.iter_mut() {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
@ -74,6 +75,7 @@ fn main() -> Result<()> {
let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
let recycler = PacketsRecycler::default();
for _ in 0..num_sockets {
let read = solana_netutil::bind_to(port, false).unwrap();
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
@ -83,7 +85,13 @@ fn main() -> Result<()> {
let (s_reader, r_reader) = channel();
read_channels.push(r_reader);
read_threads.push(receiver(Arc::new(read), &exit, s_reader));
read_threads.push(receiver(
Arc::new(read),
&exit,
s_reader,
recycler.clone(),
"bench-streamer-test",
));
}
let t_producer1 = producer(&addr, exit.clone());

View File

@ -3,6 +3,7 @@
extern crate test;
use solana::packet::to_packets;
use solana::recycler::Recycler;
use solana::sigverify;
use solana::test_tx::test_tx;
use test::Bencher;
@ -14,8 +15,10 @@ fn bench_sigverify(bencher: &mut Bencher) {
// generate packet vector
let batches = to_packets(&vec![tx; 128]);
let recycler = Recycler::default();
let recycler_out = Recycler::default();
// verify packets
bencher.iter(|| {
let _ans = sigverify::ed25519_verify(&batches);
let _ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
})
}

View File

@ -818,6 +818,9 @@ impl BankingStage {
packet_indexes: Vec<usize>,
) {
if !packet_indexes.is_empty() {
if unprocessed_packets.len() > 400 {
unprocessed_packets.remove(0);
}
unprocessed_packets.push((packets, packet_indexes));
}
}

297
core/src/cuda_runtime.rs Normal file
View File

@ -0,0 +1,297 @@
// Module for cuda-related helper functions and wrappers.
//
// cudaHostRegister/cudaHostUnregister -
// apis for page-pinning memory. Cuda driver/hardware cannot overlap
// copies from host memory to GPU memory unless the memory is page-pinned and
// cannot be paged to disk. The cuda driver provides these interfaces to pin and unpin memory.
use crate::recycler::Reset;
#[cfg(feature = "cuda")]
use crate::sigverify::{cuda_host_register, cuda_host_unregister};
use std::ops::{Deref, DerefMut};
#[cfg(feature = "cuda")]
use std::mem::size_of;
#[cfg(feature = "cuda")]
use core::ffi::c_void;
#[cfg(feature = "cuda")]
use std::os::raw::c_int;
#[cfg(feature = "cuda")]
const CUDA_SUCCESS: c_int = 0;
pub fn pin<T>(_mem: &mut Vec<T>) {
#[cfg(feature = "cuda")]
unsafe {
let err = cuda_host_register(
_mem.as_mut_ptr() as *mut c_void,
_mem.capacity() * size_of::<T>(),
0,
);
if err != CUDA_SUCCESS {
error!(
"cudaHostRegister error: {} ptr: {:?} bytes: {}",
err,
_mem.as_ptr(),
_mem.capacity() * size_of::<T>()
);
}
}
}
pub fn unpin<T>(_mem: *mut T) {
#[cfg(feature = "cuda")]
unsafe {
let err = cuda_host_unregister(_mem as *mut c_void);
if err != CUDA_SUCCESS {
error!("cudaHostUnregister returned: {} ptr: {:?}", err, _mem);
}
}
}
// A vector wrapper where the underlying memory can be
// page-pinned. Controlled by flags in case user only wants
// to pin in certain circumstances.
#[derive(Debug)]
pub struct PinnedVec<T> {
x: Vec<T>,
pinned: bool,
pinnable: bool,
}
impl Reset for PinnedVec<u8> {
fn reset(&mut self) {
self.resize(0, 0u8);
}
}
impl Reset for PinnedVec<u32> {
fn reset(&mut self) {
self.resize(0, 0u32);
}
}
impl<T: Clone> Default for PinnedVec<T> {
fn default() -> Self {
Self {
x: Vec::new(),
pinned: false,
pinnable: false,
}
}
}
impl<T> Deref for PinnedVec<T> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
&self.x
}
}
impl<T> DerefMut for PinnedVec<T> {
fn deref_mut(&mut self) -> &mut Vec<T> {
&mut self.x
}
}
pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>);
pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>);
impl<'a, T> Iterator for PinnedIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
impl<'a, T> Iterator for PinnedIterMut<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
impl<'a, T> IntoIterator for &'a mut PinnedVec<T> {
type Item = &'a T;
type IntoIter = PinnedIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
PinnedIter(self.iter())
}
}
impl<'a, T> IntoIterator for &'a PinnedVec<T> {
type Item = &'a T;
type IntoIter = PinnedIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
PinnedIter(self.iter())
}
}
impl<T: Clone> PinnedVec<T> {
pub fn reserve_and_pin(&mut self, size: usize) {
if self.x.capacity() < size {
if self.pinned {
unpin(&mut self.x);
self.pinned = false;
}
self.x.reserve(size);
}
self.set_pinnable();
if !self.pinned {
pin(&mut self.x);
self.pinned = true;
}
}
pub fn set_pinnable(&mut self) {
self.pinnable = true;
}
pub fn from_vec(source: Vec<T>) -> Self {
Self {
x: source,
pinned: false,
pinnable: false,
}
}
pub fn with_capacity(capacity: usize) -> Self {
let x = Vec::with_capacity(capacity);
Self {
x,
pinned: false,
pinnable: false,
}
}
pub fn iter(&self) -> PinnedIter<T> {
PinnedIter(self.x.iter())
}
pub fn iter_mut(&mut self) -> PinnedIterMut<T> {
PinnedIterMut(self.x.iter_mut())
}
pub fn is_empty(&self) -> bool {
self.x.is_empty()
}
pub fn len(&self) -> usize {
self.x.len()
}
#[cfg(feature = "cuda")]
pub fn as_ptr(&self) -> *const T {
self.x.as_ptr()
}
#[cfg(feature = "cuda")]
pub fn as_mut_ptr(&mut self) -> *mut T {
self.x.as_mut_ptr()
}
pub fn push(&mut self, x: T) {
let old_ptr = self.x.as_mut_ptr();
let old_capacity = self.x.capacity();
// Predict realloc and unpin
if self.pinned && self.x.capacity() == self.x.len() {
unpin(old_ptr);
self.pinned = false;
}
self.x.push(x);
self.check_ptr(old_ptr, old_capacity, "push");
}
pub fn resize(&mut self, size: usize, elem: T) {
let old_ptr = self.x.as_mut_ptr();
let old_capacity = self.x.capacity();
// Predict realloc and unpin.
if self.pinned && self.x.capacity() < size {
unpin(old_ptr);
self.pinned = false;
}
self.x.resize(size, elem);
self.check_ptr(old_ptr, old_capacity, "resize");
}
fn check_ptr(&mut self, _old_ptr: *mut T, _old_capacity: usize, _from: &'static str) {
#[cfg(feature = "cuda")]
{
if self.pinnable && (self.x.as_ptr() != _old_ptr || self.x.capacity() != _old_capacity)
{
if self.pinned {
unpin(_old_ptr);
}
trace!(
"pinning from check_ptr old: {} size: {} from: {}",
_old_capacity,
self.x.capacity(),
_from
);
pin(&mut self.x);
self.pinned = true;
}
}
}
}
impl<T: Clone> Clone for PinnedVec<T> {
fn clone(&self) -> Self {
let mut x = self.x.clone();
let pinned = if self.pinned {
pin(&mut x);
true
} else {
false
};
debug!(
"clone PinnedVec: size: {} pinned?: {} pinnable?: {}",
self.x.capacity(),
self.pinned,
self.pinnable
);
Self {
x,
pinned,
pinnable: self.pinnable,
}
}
}
impl<T> Drop for PinnedVec<T> {
fn drop(&mut self) {
if self.pinned {
unpin(self.x.as_mut_ptr());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pinned_vec() {
let mut mem = PinnedVec::with_capacity(10);
mem.set_pinnable();
mem.push(50);
mem.resize(2, 10);
assert_eq!(mem[0], 50);
assert_eq!(mem[1], 10);
assert_eq!(mem.len(), 2);
assert_eq!(mem.is_empty(), false);
let mut iter = mem.iter();
assert_eq!(*iter.next().unwrap(), 50);
assert_eq!(*iter.next().unwrap(), 10);
assert_eq!(iter.next(), None);
}
}

View File

@ -2,6 +2,7 @@
use crate::banking_stage::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET;
use crate::poh_recorder::PohRecorder;
use crate::recycler::Recycler;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::{self, PacketReceiver, PacketSender};
@ -87,9 +88,16 @@ impl FetchStage {
sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Self {
let tpu_threads = sockets
.into_iter()
.map(|socket| streamer::receiver(socket, &exit, sender.clone()));
let recycler = Recycler::default();
let tpu_threads = sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
&exit,
sender.clone(),
recycler.clone(),
"fetch_stage",
)
});
let (forward_sender, forward_receiver) = channel();
let tpu_via_blobs_threads = tpu_via_blobs_sockets

View File

@ -13,6 +13,7 @@ pub mod chacha;
#[cfg(cuda)]
pub mod chacha_cuda;
pub mod cluster_info_vote_listener;
pub mod recycler;
#[macro_use]
pub mod contact_info;
pub mod crds;
@ -31,6 +32,7 @@ pub mod cluster_info;
pub mod cluster_info_repair_listener;
pub mod cluster_tests;
pub mod consensus;
pub mod cuda_runtime;
pub mod entry;
pub mod erasure;
pub mod fetch_stage;

View File

@ -1,5 +1,7 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::cuda_runtime::PinnedVec;
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use crate::recycler::{Recycler, Reset};
use crate::result::{Error, Result};
use bincode;
use byteorder::{ByteOrder, LittleEndian};
@ -16,6 +18,7 @@ use std::fmt;
use std::io;
use std::io::Cursor;
use std::io::Write;
use std::mem;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::ops::{Deref, DerefMut};
@ -124,21 +127,61 @@ impl Meta {
#[derive(Debug, Clone)]
pub struct Packets {
pub packets: Vec<Packet>,
pub packets: PinnedVec<Packet>,
recycler: Option<PacketsRecycler>,
}
impl Drop for Packets {
fn drop(&mut self) {
if let Some(ref recycler) = self.recycler {
let old = mem::replace(&mut self.packets, PinnedVec::default());
recycler.recycle(old)
}
}
}
impl Reset for Packets {
fn reset(&mut self) {
self.packets.resize(0, Packet::default());
}
}
impl Reset for PinnedVec<Packet> {
fn reset(&mut self) {
self.resize(0, Packet::default());
}
}
//auto derive doesn't support large arrays
impl Default for Packets {
fn default() -> Packets {
let packets = PinnedVec::with_capacity(NUM_RCVMMSGS);
Packets {
packets: Vec::with_capacity(NUM_RCVMMSGS),
packets,
recycler: None,
}
}
}
pub type PacketsRecycler = Recycler<PinnedVec<Packet>>;
impl Packets {
pub fn new(packets: Vec<Packet>) -> Self {
Self { packets }
let packets = PinnedVec::from_vec(packets);
Self {
packets,
recycler: None,
}
}
pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve_and_pin(size);
Packets {
packets,
recycler: Some(recycler),
}
}
pub fn set_addr(&mut self, addr: &SocketAddr) {
@ -516,9 +559,8 @@ impl Blob {
}
// other side of store_packets
pub fn load_packets(&self) -> Vec<Packet> {
pub fn load_packets(&self, packets: &mut PinnedVec<Packet>) {
// rough estimate
let mut packets: Vec<Packet> = Vec::with_capacity(self.size() / PACKET_DATA_SIZE);
let mut pos = 0;
let size_len = bincode::serialized_size(&0usize).unwrap() as usize;
@ -538,7 +580,6 @@ impl Blob {
pos += size;
packets.push(packet);
}
packets
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
@ -652,7 +693,7 @@ mod tests {
// test that the address is actually being updated
let send_addr = socketaddr!([127, 0, 0, 1], 123);
let packets = vec![Packet::default()];
let mut msgs = Packets { packets };
let mut msgs = Packets::new(packets);
msgs.set_addr(&send_addr);
assert_eq!(SocketAddr::from(msgs.packets[0].meta.addr()), send_addr);
}
@ -678,7 +719,7 @@ mod tests {
assert_eq!(recvd, p.packets.len());
for m in p.packets {
for m in &p.packets {
assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr);
}
@ -810,10 +851,12 @@ mod tests {
let blobs = packets_to_blobs(&packets[..]);
let reconstructed_packets: Vec<Packet> =
blobs.iter().flat_map(|b| b.load_packets()).collect();
let mut reconstructed_packets = PinnedVec::default();
blobs
.iter()
.for_each(|b| b.load_packets(&mut reconstructed_packets));
assert_eq!(reconstructed_packets, packets);
assert_eq!(reconstructed_packets[..], packets[..]);
}
#[test]
@ -862,4 +905,13 @@ mod tests {
b.sign(&k);
assert!(b.verify());
}
#[test]
fn test_packets_reset() {
let mut packets = Packets::default();
packets.packets.resize(10, Packet::default());
assert_eq!(packets.packets.len(), 10);
packets.reset();
assert_eq!(packets.packets.len(), 0);
}
}

111
core/src/recycler.rs Normal file
View File

@ -0,0 +1,111 @@
use rand::{thread_rng, Rng};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
#[derive(Debug, Default)]
struct RecyclerStats {
total: AtomicUsize,
reuse: AtomicUsize,
max_gc: AtomicUsize,
}
#[derive(Debug)]
pub struct Recycler<T> {
gc: Arc<Mutex<Vec<T>>>,
stats: Arc<RecyclerStats>,
id: usize,
}
impl<T: Default> Default for Recycler<T> {
fn default() -> Recycler<T> {
let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id);
Recycler {
gc: Arc::new(Mutex::new(vec![])),
stats: Arc::new(RecyclerStats::default()),
id,
}
}
}
impl<T: Default> Clone for Recycler<T> {
fn clone(&self) -> Recycler<T> {
Recycler {
gc: self.gc.clone(),
stats: self.stats.clone(),
id: self.id,
}
}
}
pub trait Reset {
fn reset(&mut self);
}
impl<T: Default + Reset> Recycler<T> {
pub fn allocate(&self, name: &'static str) -> T {
let new = self
.gc
.lock()
.expect("recycler lock in pb fn allocate")
.pop();
if let Some(mut x) = new {
self.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
}
trace!(
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
self.stats.total.fetch_add(1, Ordering::Relaxed),
name,
self.id,
self.stats.reuse.load(Ordering::Relaxed),
self.stats.max_gc.load(Ordering::Relaxed),
);
T::default()
}
pub fn recycle(&self, x: T) {
let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x);
gc.len()
};
let max_gc = self.stats.max_gc.load(Ordering::Relaxed);
if len > max_gc {
// this is not completely accurate, but for most cases should be fine.
self.stats
.max_gc
.compare_and_swap(max_gc, len, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
impl Reset for u64 {
fn reset(&mut self) {
*self = 10;
}
}
#[test]
fn test_recycler() {
let recycler = Recycler::default();
let mut y: u64 = recycler.allocate("test_recycler1");
assert_eq!(y, 0);
y = 20;
let recycler2 = recycler.clone();
recycler2.recycle(y);
assert_eq!(recycler.gc.lock().unwrap().len(), 1);
let z = recycler.allocate("test_recycler2");
assert_eq!(z, 10);
assert_eq!(recycler.gc.lock().unwrap().len(), 0);
}
}

View File

@ -5,6 +5,7 @@ use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
use crate::contact_info::ContactInfo;
use crate::gossip_service::GossipService;
use crate::packet::to_shared_blob;
use crate::recycler::Recycler;
use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
@ -121,7 +122,14 @@ fn create_request_processor(
let (s_reader, r_reader) = channel();
let (s_responder, r_responder) = channel();
let storage_socket = Arc::new(socket);
let t_receiver = receiver(storage_socket.clone(), exit, s_reader);
let recycler = Recycler::default();
let t_receiver = receiver(
storage_socket.clone(),
exit,
s_reader,
recycler,
"replicator",
);
thread_handles.push(t_receiver);
let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder);

View File

@ -4,7 +4,9 @@
//! offloaded to the GPU.
//!
use crate::cuda_runtime::PinnedVec;
use crate::packet::{Packet, Packets};
use crate::recycler::Recycler;
use crate::result::Result;
use bincode::serialized_size;
use rayon::ThreadPool;
@ -18,7 +20,10 @@ use solana_sdk::transaction::Transaction;
use std::mem::size_of;
#[cfg(feature = "cuda")]
use std::os::raw::c_int;
use std::os::raw::{c_int, c_uint};
#[cfg(feature = "cuda")]
use core::ffi::c_void;
pub const NUM_THREADS: u32 = 10;
use std::cell::RefCell;
@ -28,7 +33,9 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
.build()
.unwrap()));
type TxOffsets = (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<Vec<u32>>);
pub type TxOffset = PinnedVec<u32>;
type TxOffsets = (TxOffset, TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>);
#[cfg(feature = "cuda")]
#[repr(C)]
@ -78,6 +85,9 @@ extern "C" {
num_elems: usize,
use_non_default_stream: u8,
) -> c_int;
pub fn cuda_host_register(ptr: *mut c_void, size: usize, flags: c_uint) -> c_int;
pub fn cuda_host_unregister(ptr: *mut c_void) -> c_int;
}
#[cfg(not(feature = "cuda"))]
@ -122,7 +132,11 @@ fn batch_size(batches: &[Packets]) -> usize {
}
#[cfg(not(feature = "cuda"))]
pub fn ed25519_verify(batches: &[Packets]) -> Vec<Vec<u8>> {
pub fn ed25519_verify(
batches: &[Packets],
_recycler: &Recycler<TxOffset>,
_recycler_out: &Recycler<PinnedVec<u8>>,
) -> Vec<Vec<u8>> {
ed25519_verify_cpu(batches)
}
@ -145,11 +159,16 @@ pub fn get_packet_offsets(packet: &Packet, current_offset: u32) -> (u32, u32, u3
)
}
pub fn generate_offsets(batches: &[Packets]) -> Result<TxOffsets> {
let mut signature_offsets: Vec<_> = Vec::new();
let mut pubkey_offsets: Vec<_> = Vec::new();
let mut msg_start_offsets: Vec<_> = Vec::new();
let mut msg_sizes: Vec<_> = Vec::new();
pub fn generate_offsets(batches: &[Packets], recycler: &Recycler<TxOffset>) -> Result<TxOffsets> {
debug!("allocating..");
let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets");
signature_offsets.set_pinnable();
let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets");
pubkey_offsets.set_pinnable();
let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets");
msg_start_offsets.set_pinnable();
let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets");
msg_sizes.set_pinnable();
let mut current_packet = 0;
let mut v_sig_lens = Vec::new();
batches.iter().for_each(|p| {
@ -229,7 +248,11 @@ pub fn init() {
}
#[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &[Packets]) -> Vec<Vec<u8>> {
pub fn ed25519_verify(
batches: &[Packets],
recycler: &Recycler<TxOffset>,
recycler_out: &Recycler<PinnedVec<u8>>,
) -> Vec<Vec<u8>> {
use crate::packet::PACKET_DATA_SIZE;
let count = batch_size(batches);
@ -243,10 +266,12 @@ pub fn ed25519_verify(batches: &[Packets]) -> Vec<Vec<u8>> {
}
let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) =
generate_offsets(batches).unwrap();
generate_offsets(batches, recycler).unwrap();
debug!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new();
debug!("allocating out..");
let mut out = recycler_out.allocate("out_buffer");
out.set_pinnable();
let mut elems = Vec::new();
let mut rvs = Vec::new();
@ -303,6 +328,11 @@ pub fn ed25519_verify(batches: &[Packets]) -> Vec<Vec<u8>> {
}
}
inc_new_counter_debug!("ed25519_verify_gpu", count);
recycler_out.recycle(out);
recycler.recycle(signature_offsets);
recycler.recycle(pubkey_offsets);
recycler.recycle(msg_sizes);
recycler.recycle(msg_start_offsets);
rvs
}
@ -320,6 +350,7 @@ pub fn make_packet_from_transaction(tx: Transaction) -> Packet {
#[cfg(test)]
mod tests {
use crate::packet::{Packet, Packets};
use crate::recycler::Recycler;
use crate::sigverify;
use crate::test_tx::{test_multisig_tx, test_tx};
use bincode::{deserialize, serialize};
@ -461,8 +492,10 @@ mod tests {
let batches = generate_packet_vec(&packet, n, 2);
let recycler = Recycler::default();
let recycler_out = Recycler::default();
// verify packets
let ans = sigverify::ed25519_verify(&batches);
let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
// check result
let ref_ans = if modify_data { 0u8 } else { 1u8 };
@ -499,8 +532,10 @@ mod tests {
batches[0].packets.push(packet);
let recycler = Recycler::default();
let recycler_out = Recycler::default();
// verify packets
let ans = sigverify::ed25519_verify(&batches);
let ans = sigverify::ed25519_verify(&batches, &recycler, &recycler_out);
// check result
let ref_ans = 1u8;

View File

@ -5,10 +5,13 @@
//! transaction. All processing is done on the CPU by default and on a GPU
//! if the `cuda` feature is enabled with `--features=cuda`.
use crate::cuda_runtime::PinnedVec;
use crate::packet::Packets;
use crate::recycler::Recycler;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify;
use crate::sigverify::TxOffset;
use crate::streamer::{self, PacketReceiver};
use crossbeam_channel::Sender as CrossbeamSender;
use solana_metrics::{datapoint_info, inc_new_counter_info};
@ -19,7 +22,7 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
#[cfg(feature = "cuda")]
const RECV_BATCH_MAX: usize = 60_000;
const RECV_BATCH_MAX: usize = 5_000;
#[cfg(not(feature = "cuda"))]
const RECV_BATCH_MAX: usize = 1000;
@ -43,11 +46,16 @@ impl SigVerifyStage {
Self { thread_hdls }
}
fn verify_batch(batch: Vec<Packets>, sigverify_disabled: bool) -> VerifiedPackets {
fn verify_batch(
batch: Vec<Packets>,
sigverify_disabled: bool,
recycler: &Recycler<TxOffset>,
recycler_out: &Recycler<PinnedVec<u8>>,
) -> VerifiedPackets {
let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&batch)
} else {
sigverify::ed25519_verify(&batch)
sigverify::ed25519_verify(&batch, recycler, recycler_out)
};
batch.into_iter().zip(r).collect()
}
@ -57,6 +65,8 @@ impl SigVerifyStage {
sendr: &CrossbeamSender<VerifiedPackets>,
sigverify_disabled: bool,
id: usize,
recycler: &Recycler<TxOffset>,
recycler_out: &Recycler<PinnedVec<u8>>,
) -> Result<()> {
let (batch, len, recv_time) = streamer::recv_batch(
&recvr.lock().expect("'recvr' lock in fn verifier"),
@ -69,11 +79,11 @@ impl SigVerifyStage {
debug!(
"@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
len,
id
);
let verified_batch = Self::verify_batch(batch, sigverify_disabled);
let verified_batch = Self::verify_batch(batch, sigverify_disabled, recycler, recycler_out);
inc_new_counter_info!("sigverify_stage-verified_packets_send", len);
if sendr.send(verified_batch).is_err() {
@ -114,17 +124,26 @@ impl SigVerifyStage {
) -> JoinHandle<()> {
Builder::new()
.name(format!("solana-verifier-{}", id))
.spawn(move || loop {
if let Err(e) =
Self::verifier(&packet_receiver, &verified_sender, sigverify_disabled, id)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => {
break;
.spawn(move || {
let recycler = Recycler::default();
let recycler_out = Recycler::default();
loop {
if let Err(e) = Self::verifier(
&packet_receiver,
&verified_sender,
sigverify_disabled,
id,
&recycler,
&recycler_out,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::SendError => {
break;
}
_ => error!("{:?}", e),
}
_ => error!("{:?}", e),
}
}
})

View File

@ -1,7 +1,7 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use crate::packet::{Blob, Packets, SharedBlobs};
use crate::packet::{Blob, Packets, PacketsRecycler, SharedBlobs};
use crate::result::{Error, Result};
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
@ -16,9 +16,15 @@ pub type PacketSender = Sender<Packets>;
pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>;
fn recv_loop(sock: &UdpSocket, exit: Arc<AtomicBool>, channel: &PacketSender) -> Result<()> {
fn recv_loop(
sock: &UdpSocket,
exit: Arc<AtomicBool>,
channel: &PacketSender,
recycler: &PacketsRecycler,
name: &'static str,
) -> Result<()> {
loop {
let mut msgs = Packets::default();
let mut msgs = Packets::new_with_recycler(recycler.clone(), 256, name);
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader trasaction socket)
@ -37,6 +43,8 @@ pub fn receiver(
sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
packet_sender: PacketSender,
recycler: PacketsRecycler,
name: &'static str,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
@ -46,7 +54,7 @@ pub fn receiver(
Builder::new()
.name("solana-receiver".to_string())
.spawn(move || {
let _ = recv_loop(&sock, exit, &packet_sender);
let _ = recv_loop(&sock, exit, &packet_sender, &recycler.clone(), name);
})
.unwrap()
}
@ -126,7 +134,7 @@ pub fn blob_receiver(
.unwrap()
}
fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender, recycler: &PacketsRecycler) -> Result<()> {
trace!(
"recv_blob_packets: receiving on {}",
sock.local_addr().unwrap()
@ -134,8 +142,9 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
let blobs = Blob::recv_from(sock)?;
for blob in blobs {
let packets = blob.read().unwrap().load_packets();
s.send(Packets::new(packets))?;
let mut packets = Packets::new_with_recycler(recycler.clone(), 256, "recv_blob_packets");
blob.read().unwrap().load_packets(&mut packets.packets);
s.send(packets)?;
}
Ok(())
@ -152,13 +161,14 @@ pub fn blob_packet_receiver(
sock.set_read_timeout(Some(timer))
.expect("set socket timeout");
let exit = exit.clone();
let recycler = PacketsRecycler::default();
Builder::new()
.name("solana-blob_packet_receiver".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blob_packets(&sock, &s);
let _ = recv_blob_packets(&sock, &s, &recycler);
})
.unwrap()
}
@ -167,6 +177,7 @@ pub fn blob_packet_receiver(
mod test {
use super::*;
use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::recycler::Recycler;
use crate::streamer::{receiver, responder};
use std::io;
use std::io::Write;
@ -207,7 +218,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), &exit, s_reader);
let t_receiver = receiver(Arc::new(read), &exit, s_reader, Recycler::default(), "test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -1,6 +1,6 @@
#!/usr/bin/env bash
PERF_LIBS_VERSION=v0.14.1
PERF_LIBS_VERSION=v0.15.0
set -e
cd "$(dirname "$0")"