From 751dd7eebb05e16e63cfa07d4f0f1e5fcdfd61da Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 10 Sep 2018 09:32:52 -0600 Subject: [PATCH] Move vote into ReplicateStage after process_entries --- src/replicate_stage.rs | 39 +++++++++++++++++++++++---------------- src/tvu.rs | 1 - src/vote_stage.rs | 2 +- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index cce3baeffd..be246bfe14 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -6,23 +6,22 @@ use crdt::Crdt; use entry::EntryReceiver; use ledger::{Block, LedgerWriter}; use log::Level; +use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::Keypair; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::responder; -use vote_stage::VoteStage; +use streamer::{responder, BlobSender}; +use vote_stage::send_validator_vote; pub struct ReplicateStage { thread_hdls: Vec>, - vote_stage: VoteStage, } impl ReplicateStage { @@ -30,8 +29,11 @@ impl ReplicateStage { fn replicate_requests( bank: &Arc, crdt: &Arc>, + blob_recycler: &BlobRecycler, window_receiver: &EntryReceiver, ledger_writer: Option<&mut LedgerWriter>, + keypair: &Arc, + vote_blob_sender: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote @@ -42,6 +44,11 @@ impl ReplicateStage { let res = bank.process_entries(&entries); + if let Err(err) = send_validator_vote(bank, keypair, crdt, blob_recycler, vote_blob_sender) + { + info!("Vote failed: {:?}", err); + } + { let mut wcrdt = crdt.write().unwrap(); wcrdt.insert_votes(&entries.votes()); @@ -66,23 +73,27 @@ impl ReplicateStage { crdt: Arc>, window_receiver: EntryReceiver, ledger_path: Option<&str>, - exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); - let vote_stage = - VoteStage::new(keypair, bank.clone(), crdt.clone(), vote_blob_sender, exit); - let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); + let keypair = Arc::new(keypair); + let blob_recycler = BlobRecycler::default(); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || loop { - if let Err(e) = - Self::replicate_requests(&bank, &crdt, &window_receiver, ledger_writer.as_mut()) - { + if let Err(e) = Self::replicate_requests( + &bank, + &crdt, + &blob_recycler, + &window_receiver, + ledger_writer.as_mut(), + &keypair, + &vote_blob_sender, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -93,10 +104,7 @@ impl ReplicateStage { let thread_hdls = vec![t_responder, t_replicate]; - ReplicateStage { - thread_hdls, - vote_stage, - } + ReplicateStage { thread_hdls } } } @@ -107,7 +115,6 @@ impl Service for ReplicateStage { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } - self.vote_stage.join()?; Ok(()) } } diff --git a/src/tvu.rs b/src/tvu.rs index f16ab087a2..236176b50c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -104,7 +104,6 @@ impl Tvu { crdt, blob_window_receiver, ledger_path, - exit, ); Tvu { diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 57317a8cba..d51fc027bb 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -153,7 +153,7 @@ pub fn send_leader_vote( Ok(()) } -fn send_validator_vote( +pub fn send_validator_vote( bank: &Arc, keypair: &Arc, crdt: &Arc>,