Delete SharedPackets (#3843)

* Delete SharedPackets

* Fix bench and sigverify
This commit is contained in:
Sagar Dhawan 2019-04-17 18:15:50 -07:00 committed by GitHub
parent 9ccd362461
commit 9c2809db21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 62 additions and 93 deletions

View File

@ -1,5 +1,5 @@
use clap::{crate_description, crate_name, crate_version, App, Arg};
use solana::packet::{Packet, SharedPackets, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::packet::{Packet, Packets, BLOB_SIZE, PACKET_DATA_SIZE};
use solana::result::Result;
use solana::streamer::{receiver, PacketReceiver};
use std::cmp::max;
@ -14,19 +14,19 @@ use std::time::SystemTime;
fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = SharedPackets::default();
let msgs_ = msgs.clone();
msgs.write().unwrap().packets.resize(10, Packet::default());
for w in &mut msgs.write().unwrap().packets {
let mut msgs = Packets::default();
msgs.packets.resize(10, Packet::default());
for w in &mut msgs.packets {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
let msgs_ = msgs.clone();
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
let mut num = 0;
for p in &msgs_.read().unwrap().packets {
for p in &msgs_.packets {
let a = p.meta.addr();
assert!(p.meta.size < BLOB_SIZE);
send.send_to(&p.data[..p.meta.size], &a).unwrap();
@ -43,7 +43,7 @@ fn sink(exit: Arc<AtomicBool>, rvs: Arc<AtomicUsize>, r: PacketReceiver) -> Join
}
let timer = Duration::new(1, 0);
if let Ok(msgs) = r.recv_timeout(timer) {
rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed);
rvs.fetch_add(msgs.packets.len(), Ordering::Relaxed);
}
})
}

View File

@ -103,7 +103,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 192)
.into_iter()
.map(|x| {
let len = x.read().unwrap().packets.len();
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
@ -218,7 +218,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let verified: Vec<_> = to_packets_chunked(&transactions.clone(), 96)
.into_iter()
.map(|x| {
let len = x.read().unwrap().packets.len();
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();

View File

@ -8,7 +8,6 @@ use crate::entry;
use crate::entry::{hash_transactions, Entry};
use crate::leader_schedule_utils;
use crate::packet;
use crate::packet::SharedPackets;
use crate::packet::{Packet, Packets};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
use crate::poh_service::{PohService, PohServiceConfig};
@ -31,7 +30,7 @@ use std::time::Duration;
use std::time::Instant;
use sys_info;
pub type UnprocessedPackets = Vec<(SharedPackets, usize, Vec<u8>)>; // `usize` is the index of the first unprocessed packet in `SharedPackets`
pub type UnprocessedPackets = Vec<(Packets, usize, Vec<u8>)>; // `usize` is the index of the first unprocessed packet in `SharedPackets`
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: u32 = 10;
@ -105,11 +104,11 @@ impl BankingStage {
fn forward_unprocessed_packets(
socket: &std::net::UdpSocket,
tpu_via_blobs: &std::net::SocketAddr,
unprocessed_packets: &[(SharedPackets, usize, Vec<u8>)],
unprocessed_packets: &[(Packets, usize, Vec<u8>)],
) -> std::io::Result<()> {
let locked_packets: Vec<_> = unprocessed_packets
.iter()
.map(|(p, start_index, _)| (p.read().unwrap(), start_index))
.map(|(p, start_index, _)| (p, start_index))
.collect();
let packets: Vec<&Packet> = locked_packets
.iter()
@ -127,7 +126,7 @@ impl BankingStage {
fn process_buffered_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &[(SharedPackets, usize, Vec<u8>)],
buffered_packets: &[(Packets, usize, Vec<u8>)],
) -> Result<UnprocessedPackets> {
let mut unprocessed_packets = vec![];
let mut bank_shutdown = false;
@ -195,7 +194,7 @@ impl BankingStage {
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
buffered_packets: &[(SharedPackets, usize, Vec<u8>)],
buffered_packets: &[(Packets, usize, Vec<u8>)],
) -> Result<UnprocessedPackets> {
let rcluster_info = cluster_info.read().unwrap();
@ -458,14 +457,13 @@ impl BankingStage {
fn process_received_packets(
bank: &Arc<Bank>,
poh: &Arc<Mutex<PohRecorder>>,
msgs: &Arc<RwLock<Packets>>,
msgs: &Packets,
vers: &[u8],
offset: usize,
) -> Result<(usize, Vec<Transaction>, Vec<usize>)> {
debug!("banking-stage-tx bank {}", bank.slot());
let transactions = Self::deserialize_transactions(&Packets::new(
msgs.read().unwrap().packets[offset..].to_owned(),
));
let transactions =
Self::deserialize_transactions(&Packets::new(msgs.packets[offset..].to_owned()));
let vers = vers[offset..].to_owned();

View File

@ -18,7 +18,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, RwLock};
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = Vec<SharedBlob>;
@ -117,7 +116,7 @@ impl Meta {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Packets {
pub packets: Vec<Packet>,
}
@ -254,15 +253,12 @@ impl Packets {
}
}
pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<SharedPackets> {
pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<Packets> {
let mut out = vec![];
for x in xs.chunks(chunks) {
let p = SharedPackets::default();
p.write()
.unwrap()
.packets
.resize(x.len(), Packet::default());
for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) {
let mut p = Packets::default();
p.packets.resize(x.len(), Packet::default());
for (i, o) in x.iter().zip(p.packets.iter_mut()) {
let mut wr = io::Cursor::new(&mut o.data[..]);
bincode::serialize_into(&mut wr, &i).expect("serialize request");
let len = wr.position() as usize;
@ -273,7 +269,7 @@ pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<SharedPa
out
}
pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<SharedPackets> {
pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<Packets> {
to_packets_chunked(xs, NUM_PACKETS)
}
@ -642,16 +638,16 @@ mod tests {
let tx = system_transaction::create_user_account(&keypair, &keypair.pubkey(), 1, hash, 0);
let rv = to_packets(&vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
assert_eq!(rv[0].packets.len(), 1);
let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[0].packets.len(), NUM_PACKETS);
let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
assert_eq!(rv[0].packets.len(), NUM_PACKETS);
assert_eq!(rv[1].packets.len(), 1);
}
#[test]

View File

@ -139,7 +139,7 @@ fn create_request_processor(
let t_processor = spawn(move || loop {
let packets = r_reader.recv_timeout(Duration::from_secs(1));
if let Ok(packets) = packets {
for packet in &packets.read().unwrap().packets {
for packet in &packets.packets {
let req: result::Result<ReplicatorRequest, Box<bincode::ErrorKind>> =
deserialize(&packet.data[..packet.meta.size]);
match req {

View File

@ -4,7 +4,7 @@
//! offloaded to the GPU.
//!
use crate::packet::{Packet, SharedPackets};
use crate::packet::{Packet, Packets};
use crate::result::Result;
use solana_metrics::counter::Counter;
use solana_sdk::pubkey::Pubkey;
@ -111,15 +111,12 @@ fn verify_packet_disabled(_packet: &Packet) -> u8 {
1
}
fn batch_size(batches: &[SharedPackets]) -> usize {
batches
.iter()
.map(|p| p.read().unwrap().packets.len())
.sum()
fn batch_size(batches: &[Packets]) -> usize {
batches.iter().map(|p| p.packets.len()).sum()
}
#[cfg(not(feature = "cuda"))]
pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
pub fn ed25519_verify(batches: &[Packets]) -> Vec<Vec<u8>> {
ed25519_verify_cpu(batches)
}
@ -141,7 +138,7 @@ pub fn get_packet_offsets(packet: &Packet, current_offset: u32) -> (u32, u32, u3
)
}
pub fn generate_offsets(batches: &[SharedPackets]) -> Result<TxOffsets> {
pub fn generate_offsets(batches: &[Packets]) -> Result<TxOffsets> {
let mut signature_offsets: Vec<_> = Vec::new();
let mut pubkey_offsets: Vec<_> = Vec::new();
let mut msg_start_offsets: Vec<_> = Vec::new();
@ -150,7 +147,7 @@ pub fn generate_offsets(batches: &[SharedPackets]) -> Result<TxOffsets> {
let mut v_sig_lens = Vec::new();
batches.iter().for_each(|p| {
let mut sig_lens = Vec::new();
p.read().unwrap().packets.iter().for_each(|packet| {
p.packets.iter().for_each(|packet| {
let current_offset = current_packet as u32 * size_of::<Packet>() as u32;
let (sig_len, sig_start, msg_start_offset, pubkey_offset) =
@ -185,39 +182,25 @@ pub fn generate_offsets(batches: &[SharedPackets]) -> Result<TxOffsets> {
))
}
pub fn ed25519_verify_cpu(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = batch_size(batches);
debug!("CPU ECDSA for {}", batch_size(batches));
let rv = batches
.into_par_iter()
.map(|p| {
p.read()
.unwrap()
.packets
.par_iter()
.map(verify_packet)
.collect()
})
.map(|p| p.packets.par_iter().map(verify_packet).collect())
.collect();
inc_new_counter_info!("ed25519_verify_cpu", count);
rv
}
pub fn ed25519_verify_disabled(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
pub fn ed25519_verify_disabled(batches: &[Packets]) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = batch_size(batches);
debug!("disabled ECDSA for {}", batch_size(batches));
let rv = batches
.into_par_iter()
.map(|p| {
p.read()
.unwrap()
.packets
.par_iter()
.map(verify_packet_disabled)
.collect()
})
.map(|p| p.packets.par_iter().map(verify_packet_disabled).collect())
.collect();
inc_new_counter_info!("ed25519_verify_disabled", count);
rv
@ -235,7 +218,7 @@ pub fn init() {
}
#[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
pub fn ed25519_verify(batches: &[Packets]) -> Vec<Vec<u8>> {
use crate::packet::PACKET_DATA_SIZE;
let count = batch_size(batches);
@ -254,14 +237,10 @@ pub fn ed25519_verify(batches: &[SharedPackets]) -> Vec<Vec<u8>> {
debug!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new();
let mut elems = Vec::new();
let mut locks = Vec::new();
let mut rvs = Vec::new();
for packets in batches {
locks.push(packets.read().unwrap());
}
let mut num_packets = 0;
for p in locks {
for p in batches {
elems.push(Elems {
elems: p.packets.as_ptr(),
num: p.packets.len() as u32,
@ -327,7 +306,7 @@ pub fn make_packet_from_transaction(tx: Transaction) -> Packet {
#[cfg(test)]
mod tests {
use crate::packet::{Packet, SharedPackets};
use crate::packet::{Packet, Packets};
use crate::sigverify;
use crate::test_tx::{test_multisig_tx, test_tx};
use bincode::{deserialize, serialize};
@ -440,20 +419,16 @@ mod tests {
packet: &Packet,
num_packets_per_batch: usize,
num_batches: usize,
) -> Vec<SharedPackets> {
) -> Vec<Packets> {
// generate packet vector
let batches: Vec<_> = (0..num_batches)
.map(|_| {
let packets = SharedPackets::default();
packets
.write()
.unwrap()
.packets
.resize(0, Packet::default());
let mut packets = Packets::default();
packets.packets.resize(0, Packet::default());
for _ in 0..num_packets_per_batch {
packets.write().unwrap().packets.push(packet.clone());
packets.packets.push(packet.clone());
}
assert_eq!(packets.read().unwrap().packets.len(), num_packets_per_batch);
assert_eq!(packets.packets.len(), num_packets_per_batch);
packets
})
.collect();
@ -505,11 +480,11 @@ mod tests {
let n = 4;
let num_batches = 3;
let batches = generate_packet_vec(&packet, n, num_batches);
let mut batches = generate_packet_vec(&packet, n, num_batches);
packet.data[40] = packet.data[40].wrapping_add(8);
batches[0].write().unwrap().packets.push(packet);
batches[0].packets.push(packet);
// verify packets
let ans = sigverify::ed25519_verify(&batches);

View File

@ -5,7 +5,7 @@
//! transaction. All processing is done on the CPU by default and on a GPU
//! if the `cuda` feature is enabled with `--features=cuda`.
use crate::packet::SharedPackets;
use crate::packet::Packets;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::sigverify;
@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
pub type VerifiedPackets = Vec<(SharedPackets, Vec<u8>)>;
pub type VerifiedPackets = Vec<(Packets, Vec<u8>)>;
pub struct SigVerifyStage {
thread_hdls: Vec<JoinHandle<()>>,
@ -27,7 +27,7 @@ pub struct SigVerifyStage {
impl SigVerifyStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
packet_receiver: Receiver<SharedPackets>,
packet_receiver: Receiver<Packets>,
sigverify_disabled: bool,
verified_sender: Sender<VerifiedPackets>,
) -> Self {
@ -37,7 +37,7 @@ impl SigVerifyStage {
Self { thread_hdls }
}
fn verify_batch(batch: Vec<SharedPackets>, sigverify_disabled: bool) -> VerifiedPackets {
fn verify_batch(batch: Vec<Packets>, sigverify_disabled: bool) -> VerifiedPackets {
let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&batch)
} else {

View File

@ -2,7 +2,7 @@
//!
use crate::packet::{
deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE,
deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, PACKET_DATA_SIZE,
};
use crate::result::{Error, Result};
use bincode;
@ -10,12 +10,12 @@ use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
pub type PacketReceiver = Receiver<Packets>;
pub type PacketSender = Sender<Packets>;
pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>;
@ -29,7 +29,7 @@ fn recv_loop(sock: &UdpSocket, exit: Arc<AtomicBool>, channel: &PacketSender) ->
return Ok(());
}
if let Ok(_len) = msgs.recv_from(sock) {
channel.send(Arc::new(RwLock::new(msgs)))?;
channel.send(msgs)?;
break;
}
}
@ -61,16 +61,16 @@ fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> {
Ok(())
}
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize, u64)> {
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, u64)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
trace!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut len = msgs.packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.read().unwrap().packets.len();
len += more.packets.len();
batch.push(more);
if len > 100_000 {
@ -154,7 +154,7 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
}
let packets = packets?;
s.send(Arc::new(RwLock::new(Packets::new(packets))))?;
s.send(Packets::new(packets))?;
}
Ok(())
@ -199,7 +199,7 @@ mod test {
for _ in 0..10 {
let m = r.recv_timeout(Duration::new(1, 0))?;
*num -= m.read().unwrap().packets.len();
*num -= m.packets.len();
if *num == 0 {
break;