Read multiple entries in write stage (#1259)

- Also use rayon to parallelize to_blobs() to maximize CPU usage
This commit is contained in:
Pankaj Garg 2018-09-18 21:45:49 -07:00 committed by GitHub
parent 6dee632d67
commit 0ee6c5bf9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 22 deletions

View File

@ -7,7 +7,8 @@ use entry::Entry;
use erasure;
use ledger::Block;
use log::Level;
use packet::BlobRecycler;
use packet::{BlobRecycler, SharedBlobs};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
@ -15,7 +16,8 @@ use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::{Duration, Instant};
use timing::duration_as_ms;
use window::{self, SharedWindow, WindowIndex, WindowUtil, WINDOW_SIZE};
fn broadcast(
@ -31,20 +33,34 @@ fn broadcast(
let id = node_info.id;
let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?;
let mut dq = entries.to_blobs(recycler);
let mut num_entries = entries.len();
let mut ventries = Vec::new();
ventries.push(entries);
while let Ok(entries) = receiver.try_recv() {
dq.append(&mut entries.to_blobs(recycler));
num_entries += entries.len();
ventries.push(entries);
}
let to_blobs_start = Instant::now();
let dq: SharedBlobs = ventries
.into_par_iter()
.flat_map(|p| p.to_blobs(recycler))
.collect();
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
// flatten deque to vec
let blobs_vec: Vec<_> = dq.into_iter().collect();
let blobs_chunking = Instant::now();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
trace!("{}", window.read().unwrap().print(&id, *receive_index));
let broadcast_start = Instant::now();
for mut blobs in blobs_chunked {
let blobs_len = blobs.len();
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
@ -116,6 +132,13 @@ fn broadcast(
*receive_index,
)?;
}
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
info!(
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
);
Ok(())
}

View File

@ -17,8 +17,9 @@ use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::{Duration, Instant};
use streamer::responder;
use timing::{duration_as_ms, duration_as_s};
use vote_stage::send_leader_vote;
pub struct WriteStage {
@ -34,26 +35,65 @@ impl WriteStage {
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let mut ventries = Vec::new();
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut num_entries = entries.len();
let mut num_txs = 0;
let votes = &entries.votes();
crdt.write().unwrap().insert_votes(&votes);
ledger_writer.write_entries(entries.clone())?;
inc_new_counter_info!("write_stage-write_entries", entries.len());
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
trace!("New entries? {}", entries.len());
if !entries.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_entries", entries.len());
trace!("broadcasting {}", entries.len());
entry_sender.send(entries)?;
ventries.push(entries);
while let Ok(more) = entry_receiver.try_recv() {
num_entries += more.len();
ventries.push(more);
}
info!("write_stage entries: {}", num_entries);
let to_blobs_total = 0;
let mut blob_send_total = 0;
let mut register_entry_total = 0;
let mut crdt_votes_total = 0;
let start = Instant::now();
for _ in 0..ventries.len() {
let entries = ventries.pop().unwrap();
for e in entries.iter() {
num_txs += e.transactions.len();
}
let crdt_votes_start = Instant::now();
let votes = &entries.votes();
crdt.write().unwrap().insert_votes(&votes);
crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed());
ledger_writer.write_entries(entries.clone())?;
let register_entry_start = Instant::now();
register_entry_total += duration_as_ms(&register_entry_start.elapsed());
inc_new_counter_info!("write_stage-write_entries", entries.len());
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
trace!("New entries? {}", entries.len());
let blob_send_start = Instant::now();
if !entries.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_entries", entries.len());
trace!("broadcasting {}", entries.len());
entry_sender.send(entries)?;
}
blob_send_total += duration_as_ms(&blob_send_start.elapsed());
}
info!("done write_stage txs: {} time {} ms txs/s: {} to_blobs_total: {} register_entry_total: {} blob_send_total: {} crdt_votes_total: {}",
num_txs, duration_as_ms(&start.elapsed()),
num_txs as f32 / duration_as_s(&start.elapsed()),
to_blobs_total,
register_entry_total,
blob_send_total,
crdt_votes_total);
Ok(())
}