From 0ee6c5bf9d8fbbc3c04f806b385ceb367328c3ff Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 18 Sep 2018 21:45:49 -0700 Subject: [PATCH] Read multiple entries in write stage (#1259) - Also use rayon to parallelize to_blobs() to maximize CPU usage --- src/broadcast_stage.rs | 31 ++++++++++++++--- src/write_stage.rs | 76 ++++++++++++++++++++++++++++++++---------- 2 files changed, 85 insertions(+), 22 deletions(-) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 975fb41fa..62b416f68 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -7,7 +7,8 @@ use entry::Entry; use erasure; use ledger::Block; use log::Level; -use packet::BlobRecycler; +use packet::{BlobRecycler, SharedBlobs}; +use rayon::prelude::*; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; @@ -15,7 +16,8 @@ use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; +use timing::duration_as_ms; use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE}; fn broadcast( @@ -31,20 +33,34 @@ fn broadcast( let id = node_info.id; let timer = Duration::new(1, 0); let entries = receiver.recv_timeout(timer)?; - let mut dq = entries.to_blobs(recycler); + let mut num_entries = entries.len(); + let mut ventries = Vec::new(); + ventries.push(entries); while let Ok(entries) = receiver.try_recv() { - dq.append(&mut entries.to_blobs(recycler)); + num_entries += entries.len(); + ventries.push(entries); } + let to_blobs_start = Instant::now(); + let dq: SharedBlobs = ventries + .into_par_iter() + .flat_map(|p| p.to_blobs(recycler)) + .collect(); + + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); + // flatten deque to vec let blobs_vec: Vec<_> = dq.into_iter().collect(); + let blobs_chunking = Instant::now(); // We could receive more blobs than window slots so // break them up into window-sized chunks to process let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); + let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed()); trace!("{}", window.read().unwrap().print(&id, *receive_index)); + let broadcast_start = Instant::now(); for mut blobs in blobs_chunked { let blobs_len = blobs.len(); trace!("{}: broadcast blobs.len: {}", id, blobs_len); @@ -116,6 +132,13 @@ fn broadcast( *receive_index, )?; } + let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); + + info!( + "broadcast: {} entries, blob time {} chunking time {} broadcast time {}", + num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed + ); + Ok(()) } diff --git a/src/write_stage.rs b/src/write_stage.rs index f77c8e1ee..da36210be 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -17,8 +17,9 @@ use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; use streamer::responder; +use timing::{duration_as_ms, duration_as_s}; use vote_stage::send_leader_vote; pub struct WriteStage { @@ -34,26 +35,65 @@ impl WriteStage { entry_sender: &Sender>, entry_receiver: &Receiver>, ) -> Result<()> { + let mut ventries = Vec::new(); let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + let mut num_entries = entries.len(); + let mut num_txs = 0; - let votes = &entries.votes(); - crdt.write().unwrap().insert_votes(&votes); - - ledger_writer.write_entries(entries.clone())?; - - inc_new_counter_info!("write_stage-write_entries", entries.len()); - - //TODO(anatoly): real stake based voting needs to change this - //leader simply votes if the current set of validators have voted - //on a valid last id - - trace!("New entries? {}", entries.len()); - if !entries.is_empty() { - inc_new_counter_info!("write_stage-recv_vote", votes.len()); - inc_new_counter_info!("write_stage-broadcast_entries", entries.len()); - trace!("broadcasting {}", entries.len()); - entry_sender.send(entries)?; + ventries.push(entries); + while let Ok(more) = entry_receiver.try_recv() { + num_entries += more.len(); + ventries.push(more); } + + info!("write_stage entries: {}", num_entries); + + let to_blobs_total = 0; + let mut blob_send_total = 0; + let mut register_entry_total = 0; + let mut crdt_votes_total = 0; + + let start = Instant::now(); + for _ in 0..ventries.len() { + let entries = ventries.pop().unwrap(); + for e in entries.iter() { + num_txs += e.transactions.len(); + } + let crdt_votes_start = Instant::now(); + let votes = &entries.votes(); + crdt.write().unwrap().insert_votes(&votes); + crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed()); + + ledger_writer.write_entries(entries.clone())?; + + let register_entry_start = Instant::now(); + register_entry_total += duration_as_ms(®ister_entry_start.elapsed()); + + inc_new_counter_info!("write_stage-write_entries", entries.len()); + + //TODO(anatoly): real stake based voting needs to change this + //leader simply votes if the current set of validators have voted + //on a valid last id + + trace!("New entries? {}", entries.len()); + let blob_send_start = Instant::now(); + if !entries.is_empty() { + inc_new_counter_info!("write_stage-recv_vote", votes.len()); + inc_new_counter_info!("write_stage-broadcast_entries", entries.len()); + trace!("broadcasting {}", entries.len()); + entry_sender.send(entries)?; + } + + blob_send_total += duration_as_ms(&blob_send_start.elapsed()); + } + info!("done write_stage txs: {} time {} ms txs/s: {} to_blobs_total: {} register_entry_total: {} blob_send_total: {} crdt_votes_total: {}", + num_txs, duration_as_ms(&start.elapsed()), + num_txs as f32 / duration_as_s(&start.elapsed()), + to_blobs_total, + register_entry_total, + blob_send_total, + crdt_votes_total); + Ok(()) }