diff --git a/Cargo.toml b/Cargo.toml index 4317a31b1..6cf9607b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,3 +68,4 @@ libc = "^0.2.1" getopts = "^0.2" isatty = "0.1" futures = "0.1" +rand = "0.4.2" diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 18ce6cb6b..b4645dc89 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -17,7 +17,6 @@ use recorder::Signal; use result::Result; use serde_json; use signature::PublicKey; -use std::cmp::max; use std::collections::VecDeque; use std::io::sink; use std::io::{Cursor, Write}; @@ -30,6 +29,9 @@ use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use timing; +use std::time::Instant; +use rand::{thread_rng, Rng}; pub struct AccountantSkel { acc: Mutex, @@ -256,41 +258,64 @@ impl AccountantSkel { } } - fn recv_batch(recvr: &streamer::PacketReceiver) -> Result> { + fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec, usize)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; debug!("got msgs"); + let mut len = msgs.read().unwrap().packets.len(); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { - debug!("got more msgs"); + trace!("got more msgs"); + len += more.read().unwrap().packets.len(); batch.push(more); + + if len > 100_000 { + break; + } } - info!("batch len {}", batch.len()); - Ok(batch) + debug!("batch len {}", batch.len()); + Ok((batch, len)) } - fn verify_batch(batch: Vec) -> Vec)>> { - let chunk_size = max(1, (batch.len() + 3) / 4); - let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect(); - batches - .into_par_iter() - .map(|batch| { - let r = ecdsa::ed25519_verify(&batch); - batch.into_iter().zip(r).collect() - }) - .collect() + fn verify_batch( + batch: Vec, + sendr: &Arc)>>>>, + ) -> Result<()> { + let r = ecdsa::ed25519_verify(&batch); + let res = batch.into_iter().zip(r).collect(); + sendr.lock().unwrap().send(res)?; + // TODO: fix error handling here? + Ok(()) } fn verifier( - recvr: &streamer::PacketReceiver, - sendr: &Sender)>>, + recvr: &Arc>, + sendr: &Arc)>>>>, ) -> Result<()> { - let batch = Self::recv_batch(recvr)?; - let verified_batches = Self::verify_batch(batch); - debug!("verified batches: {}", verified_batches.len()); - for xs in verified_batches { - sendr.send(xs)?; - } + let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?; + let now = Instant::now(); + let batch_len = batch.len(); + let rand_id = thread_rng().gen_range(0, 100); + info!( + "@{:?} verifier: verifying: {} id: {}", + timing::timestamp(), + batch.len(), + rand_id + ); + + Self::verify_batch(batch, sendr).unwrap(); + + let total_time_ms = timing::duration_as_ms(&now.elapsed()); + let total_time_s = timing::duration_as_s(&now.elapsed()); + info!( + "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", + timing::timestamp(), + batch_len, + total_time_ms, + rand_id, + len, + (len as f32 / total_time_s) + ); Ok(()) } @@ -335,8 +360,6 @@ impl AccountantSkel { } } - debug!("processing verified"); - // Let validators know they should not attempt to process additional // transactions in parallel. self.historian_input.lock().unwrap().send(Signal::Tick)?; @@ -387,16 +410,25 @@ impl AccountantSkel { blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); + let recv_start = Instant::now(); let mms = verified_receiver.recv_timeout(timer)?; - debug!("got some messages: {}", mms.len()); + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); for (msgs, vers) in mms { let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + reqs_len += reqs.len(); let req_vers = reqs.into_iter() .zip(vers) .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) .filter(|x| { let v = x.0.verify(); - trace!("v:{} x:{:?}", v, x); v }) .collect(); @@ -421,7 +453,16 @@ impl AccountantSkel { } packet_recycler.recycle(msgs); } - debug!("done responding"); + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); Ok(()) } /// Process verified blobs, already in order @@ -486,13 +527,21 @@ impl AccountantSkel { ); let (verified_sender, verified_receiver) = channel(); - let exit_ = exit.clone(); - let t_verifier = spawn(move || loop { - let e = Self::verifier(&packet_receiver, &verified_sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); + let mut verify_threads = Vec::new(); + let shared_verified_sender = Arc::new(Mutex::new(verified_sender)); + let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver)); + for _ in 0..4 { + let exit_ = exit.clone(); + let recv = shared_packet_receiver.clone(); + let sender = shared_verified_sender.clone(); + let thread = spawn(move || loop { + let e = Self::verifier(&recv, &sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + break; + } + }); + verify_threads.push(thread); + } let (broadcast_sender, broadcast_receiver) = channel(); @@ -528,16 +577,18 @@ impl AccountantSkel { } } }); - Ok(vec![ + + let mut threads = vec![ t_receiver, t_responder, t_server, - t_verifier, t_sync, t_gossip, t_listen, t_broadcast, - ]) + ]; + threads.extend(verify_threads.into_iter()); + Ok(threads) } /// This service receives messages from a leader in the network and processes the transactions @@ -639,15 +690,21 @@ impl AccountantSkel { ); let (verified_sender, verified_receiver) = channel(); - let exit_ = exit.clone(); - let t_verifier = spawn(move || loop { - let e = Self::verifier(&packet_receiver, &verified_sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - trace!("verifier exiting"); - break; - } - }); - + let mut verify_threads = Vec::new(); + let shared_verified_sender = Arc::new(Mutex::new(verified_sender)); + let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver)); + for _ in 0..4 { + let exit_ = exit.clone(); + let recv = shared_packet_receiver.clone(); + let sender = shared_verified_sender.clone(); + let thread = spawn(move || loop { + let e = Self::verifier(&recv, &sender); + if e.is_err() && exit_.load(Ordering::Relaxed) { + break; + } + }); + verify_threads.push(thread); + } let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); let skel = obj.clone(); @@ -667,7 +724,7 @@ impl AccountantSkel { } }); - Ok(vec![ + let mut threads = vec![ //replicate threads t_blob_receiver, t_retransmit, @@ -679,9 +736,10 @@ impl AccountantSkel { t_packet_receiver, t_responder, t_server, - t_verifier, t_sync, - ]) + ]; + threads.extend(verify_threads.into_iter()); + Ok(threads) } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index d5356083b..c0fc43bf1 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -161,6 +161,7 @@ mod tests { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; + use std::time::Instant; // TODO: Figure out why this test sometimes hangs on TravisCI. #[test] @@ -193,7 +194,18 @@ mod tests { let last_id = acc.get_last_id().wait().unwrap(); let _sig = acc.transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); - assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 500); + let mut balance; + let now = Instant::now(); + loop { + balance = acc.get_balance(&bob_pubkey); + if balance.is_ok() { + break; + } + if now.elapsed().as_secs() > 0 { + break; + } + } + assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); for t in threads { t.join().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 75c9b65f8..b316a79c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ pub mod result; pub mod signature; pub mod streamer; pub mod transaction; +pub mod timing; extern crate bincode; extern crate byteorder; extern crate chrono; @@ -41,3 +42,5 @@ extern crate futures; #[cfg(test)] #[macro_use] extern crate matches; + +extern crate rand; diff --git a/src/timing.rs b/src/timing.rs new file mode 100644 index 000000000..5c36fad80 --- /dev/null +++ b/src/timing.rs @@ -0,0 +1,15 @@ +use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::Duration; + +pub fn duration_as_ms(d: &Duration) -> u64 { + return (d.as_secs() * 1000) + (d.subsec_nanos() as u64 / 1_000_000); +} + +pub fn duration_as_s(d: &Duration) -> f32 { + return d.as_secs() as f32 + (d.subsec_nanos() as f32 / 1_000_000_000.0); +} + +pub fn timestamp() -> u64 { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + return duration_as_ms(&now); +}