Rework sig processing threads and add perf for process/verify

This commit is contained in:
Stephen Akridge 2018-05-07 16:49:15 -07:00
parent 6f3ec8d21f
commit bd0671e123
5 changed files with 140 additions and 51 deletions

View File

@ -68,3 +68,4 @@ libc = "^0.2.1"
getopts = "^0.2"
isatty = "0.1"
futures = "0.1"
rand = "0.4.2"

View File

@ -17,7 +17,6 @@ use recorder::Signal;
use result::Result;
use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
@ -30,6 +29,9 @@ use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use transaction::Transaction;
use timing;
use std::time::Instant;
use rand::{thread_rng, Rng};
pub struct AccountantSkel {
acc: Mutex<Accountant>,
@ -256,41 +258,64 @@ impl AccountantSkel {
}
}
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
debug!("got more msgs");
trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more);
if len > 100_000 {
break;
}
}
info!("batch len {}", batch.len());
Ok(batch)
debug!("batch len {}", batch.len());
Ok((batch, len))
}
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> {
let chunk_size = max(1, (batch.len() + 3) / 4);
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect();
batches
.into_par_iter()
.map(|batch| {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
})
.collect()
fn verify_batch(
batch: Vec<SharedPackets>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let r = ecdsa::ed25519_verify(&batch);
let res = batch.into_iter().zip(r).collect();
sendr.lock().unwrap().send(res)?;
// TODO: fix error handling here?
Ok(())
}
fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
debug!("verified batches: {}", verified_batches.len());
for xs in verified_batches {
sendr.send(xs)?;
}
let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?;
let now = Instant::now();
let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100);
info!(
"@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
rand_id
);
Self::verify_batch(batch, sendr).unwrap();
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
rand_id,
len,
(len as f32 / total_time_s)
);
Ok(())
}
@ -335,8 +360,6 @@ impl AccountantSkel {
}
}
debug!("processing verified");
// Let validators know they should not attempt to process additional
// transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?;
@ -387,16 +410,25 @@ impl AccountantSkel {
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
debug!("got some messages: {}", mms.len());
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
"@{:?} process start stalled for: {:?}ms batches: {}",
timing::timestamp(),
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
let proc_start = Instant::now();
for (msgs, vers) in mms {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
reqs_len += reqs.len();
let req_vers = reqs.into_iter()
.zip(vers)
.filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver)))
.filter(|x| {
let v = x.0.verify();
trace!("v:{} x:{:?}", v, x);
v
})
.collect();
@ -421,7 +453,16 @@ impl AccountantSkel {
}
packet_recycler.recycle(msgs);
}
debug!("done responding");
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),
mms_len,
total_time_ms,
reqs_len,
(reqs_len as f32) / (total_time_s)
);
Ok(())
}
/// Process verified blobs, already in order
@ -486,13 +527,21 @@ impl AccountantSkel {
);
let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let (broadcast_sender, broadcast_receiver) = channel();
@ -528,16 +577,18 @@ impl AccountantSkel {
}
}
});
Ok(vec![
let mut threads = vec![
t_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
t_gossip,
t_listen,
t_broadcast,
])
];
threads.extend(verify_threads.into_iter());
Ok(threads)
}
/// This service receives messages from a leader in the network and processes the transactions
@ -639,15 +690,21 @@ impl AccountantSkel {
);
let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
trace!("verifier exiting");
break;
}
});
let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());
let skel = obj.clone();
@ -667,7 +724,7 @@ impl AccountantSkel {
}
});
Ok(vec![
let mut threads = vec![
//replicate threads
t_blob_receiver,
t_retransmit,
@ -679,9 +736,10 @@ impl AccountantSkel {
t_packet_receiver,
t_responder,
t_server,
t_verifier,
t_sync,
])
];
threads.extend(verify_threads.into_iter());
Ok(threads)
}
}

View File

@ -161,6 +161,7 @@ mod tests {
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
// TODO: Figure out why this test sometimes hangs on TravisCI.
#[test]
@ -193,7 +194,18 @@ mod tests {
let last_id = acc.get_last_id().wait().unwrap();
let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500);
let mut balance;
let now = Instant::now();
loop {
balance = acc.get_balance(&bob_pubkey);
if balance.is_ok() {
break;
}
if now.elapsed().as_secs() > 0 {
break;
}
}
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in threads {
t.join().unwrap();

View File

@ -20,6 +20,7 @@ pub mod result;
pub mod signature;
pub mod streamer;
pub mod transaction;
pub mod timing;
extern crate bincode;
extern crate byteorder;
extern crate chrono;
@ -41,3 +42,5 @@ extern crate futures;
#[cfg(test)]
#[macro_use]
extern crate matches;
extern crate rand;

15
src/timing.rs Normal file
View File

@ -0,0 +1,15 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::Duration;
pub fn duration_as_ms(d: &Duration) -> u64 {
return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000);
}
pub fn duration_as_s(d: &Duration) -> f32 {
return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0);
}
pub fn timestamp() -> u64 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
return duration_as_ms(&now);
}