Validators now vote once a second regardless

This commit is contained in:
Michael Vines 2018-07-19 21:27:35 -07:00
parent 80a02359f7
commit 7672506b45
5 changed files with 199 additions and 73 deletions

View File

@ -49,6 +49,7 @@ pub mod timing;
pub mod tpu;
pub mod transaction;
pub mod tvu;
pub mod vote_stage;
pub mod voting;
pub mod window_stage;
pub mod write_stage;

View File

@ -1,7 +1,6 @@
//! The `replicate_stage` replicates transactions broadcast by the leader.
use bank::Bank;
use bincode::serialize;
use counter::Counter;
use crdt::Crdt;
use ledger;
@ -9,35 +8,29 @@ use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::KeyPair;
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{responder, BlobReceiver, BlobSender};
use timing;
use transaction::Transaction;
use streamer::{responder, BlobReceiver};
use vote_stage::VoteStage;
use voting::entries_to_votes;
pub struct ReplicateStage {
thread_hdls: Vec<JoinHandle<()>>,
}
const VOTE_TIMEOUT_MS: u64 = 1000;
impl ReplicateStage {
/// Process entry blobs, already in order
fn replicate_requests(
keypair: &Arc<KeyPair>,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
window_receiver: &BlobReceiver,
vote_blob_sender: &BlobSender,
last_vote: &mut u64,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
@ -61,30 +54,6 @@ impl ReplicateStage {
error!("process_entries {} {:?}", blobs_len, res);
}
let _ = res?;
let now = timing::timestamp();
if now - *last_vote > VOTE_TIMEOUT_MS {
let last_id = bank.last_id();
let shared_blob = blob_recycler.allocate();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id
info!("replicate_stage {:?}", &last_id[..8]);
wcrdt.new_vote(last_id)
}?;
{
let mut blob = shared_blob.write().unwrap();
let tx = Transaction::new_vote(&keypair, vote, last_id, 0);
let bytes = serialize(&tx)?;
let len = bytes.len();
blob.data[..len].copy_from_slice(&bytes);
blob.meta.set_addr(&addr);
blob.meta.size = len;
}
inc_new_counter!("replicate-vote_sent", 1);
*last_vote = now;
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
}
while let Some(blob) = blobs.pop_front() {
blob_recycler.recycle(blob);
}
@ -96,6 +65,7 @@ impl ReplicateStage {
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
window_receiver: BlobReceiver,
exit: Arc<AtomicBool>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -105,34 +75,35 @@ impl ReplicateStage {
blob_recycler.clone(),
vote_blob_receiver,
);
let skeypair = Arc::new(keypair);
let vote_stage = VoteStage::new(
Arc::new(keypair),
bank.clone(),
crdt.clone(),
blob_recycler.clone(),
vote_blob_sender,
exit,
);
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
.spawn(move || {
let mut timestamp: u64 = 0;
loop {
if let Err(e) = Self::replicate_requests(
&skeypair,
&bank,
&crdt,
&blob_recycler,
&window_receiver,
&vote_blob_sender,
&mut timestamp,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
.spawn(move || loop {
if let Err(e) =
Self::replicate_requests(&bank, &crdt, &blob_recycler, &window_receiver)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
})
.unwrap();
ReplicateStage {
thread_hdls: vec![t_responder, t_replicate],
}
let mut thread_hdls = vec![t_responder, t_replicate];
thread_hdls.extend(vote_stage.thread_hdls());
ReplicateStage { thread_hdls }
}
}

View File

@ -4,22 +4,22 @@
//! ```text
//! .--------------------------------------------.
//! | |
//! | .--------------------------------+---------.
//! | | TVU | |
//! | | | |
//! | | | | .------------.
//! | | .------------+----------->| Validators |
//! v | .-------. | | | `------------`
//! .----+---. | | | .----+---. .----+------. |
//! | Leader |--------->| Blob | | Window | | Replicate | |
//! `--------` | | Fetch |-->| Stage |-->| Stage | |
//! .------------. | | Stage | | | | | |
//! | Validators |----->| | `--------` `----+------` |
//! `------------` | `-------` | |
//! | | |
//! | | |
//! | | |
//! `--------------------------------|---------`
//! | .--------------------------------+------------.
//! | | TVU | |
//! | | | |
//! | | | | .------------.
//! | | .------------+-------------->| Validators |
//! v | .-------. | | | `------------`
//! .----+---. | | | .----+---. .----+---------. |
//! | Leader |--------->| Blob | | Window | | Replicate | |
//! `--------` | | Fetch |-->| Stage |-->| Stage / | |
//! .------------. | | Stage | | | | Vote Stage | |
//! | Validators |----->| | `--------` `----+---------` |
//! `------------` | `-------` | |
//! | | |
//! | | |
//! | | |
//! `--------------------------------|------------`
//! |
//! v
//! .------.
@ -82,7 +82,7 @@ impl Tvu {
let blob_recycler = BlobRecycler::default();
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(
vec![replicate_socket, repair_socket],
exit,
exit.clone(),
&blob_recycler,
);
//TODO
@ -103,6 +103,7 @@ impl Tvu {
crdt,
blob_recycler,
blob_window_receiver,
exit,
);
Tvu {

153
src/vote_stage.rs Normal file
View File

@ -0,0 +1,153 @@
//! The `vote_stage` votes on the `last_id` of the bank at a regular cadence
use bank::Bank;
use bincode::serialize;
use counter::Counter;
use crdt::Crdt;
use hash::Hash;
use packet::BlobRecycler;
use result::Result;
use service::Service;
use signature::KeyPair;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{self, sleep, spawn, JoinHandle};
use std::time::Duration;
use streamer::BlobSender;
use transaction::Transaction;
const VOTE_TIMEOUT_MS: u64 = 1000;
pub struct VoteStage {
thread_hdl: JoinHandle<()>,
}
impl VoteStage {
pub fn new(
keypair: Arc<KeyPair>,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
vote_blob_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = spawn(move || {
Self::run(
&keypair,
&bank,
&crdt,
&blob_recycler,
&vote_blob_sender,
&exit,
);
});
VoteStage { thread_hdl }
}
fn run(
keypair: &Arc<KeyPair>,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
exit: &Arc<AtomicBool>,
) {
while !exit.load(Ordering::Relaxed) {
let last_id = bank.last_id();
if let Err(err) = Self::vote(&last_id, keypair, crdt, blob_recycler, vote_blob_sender) {
info!("Vote failed: {:?}", err);
}
sleep(Duration::from_millis(VOTE_TIMEOUT_MS));
}
}
fn vote(
last_id: &Hash,
keypair: &Arc<KeyPair>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
) -> Result<()> {
let shared_blob = blob_recycler.allocate();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id
info!("replicate_stage {:?}", &last_id[..8]);
wcrdt.new_vote(*last_id)
}?;
{
let mut blob = shared_blob.write().unwrap();
let tx = Transaction::new_vote(&keypair, vote, *last_id, 0);
let bytes = serialize(&tx)?;
let len = bytes.len();
blob.data[..len].copy_from_slice(&bytes);
blob.meta.set_addr(&addr);
blob.meta.size = len;
}
inc_new_counter!("replicate-vote_sent", 1);
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
Ok(())
}
}
impl Service for VoteStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()?;
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use bank::Bank;
use crdt::{Crdt, TestNode};
use mint::Mint;
use packet::BlobRecycler;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
/// Ensure the VoteStage issues votes at the expected cadence
#[test]
fn test_vote_cadence() {
let keypair = KeyPair::new();
let mint = Mint::new(1234);
let bank = Arc::new(Bank::new(&mint));
let node = TestNode::new_localhost();
let mut crdt = Crdt::new(node.data.clone()).expect("Crdt::new");
crdt.set_leader(node.data.id);
let blob_recycler = BlobRecycler::default();
let (sender, receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
let vote_stage = VoteStage::new(
Arc::new(keypair),
bank.clone(),
Arc::new(RwLock::new(crdt)),
blob_recycler.clone(),
sender,
exit.clone(),
);
receiver.recv().unwrap();
let timeout = Duration::from_millis(VOTE_TIMEOUT_MS * 2);
receiver.recv_timeout(timeout).unwrap();
receiver.recv_timeout(timeout).unwrap();
exit.store(true, Ordering::Relaxed);
vote_stage.join().expect("join");
}
}

View File

@ -360,8 +360,8 @@ fn test_leader_restart_validator_start_from_old_ledger() {
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected));
assert_eq!(getbal, Some(expected));
leader_fullnode.close().unwrap();
val_fullnode.close().unwrap();
leader_fullnode.close().unwrap();
std::fs::remove_file(ledger_path).unwrap();
std::fs::remove_file(stale_ledger_path).unwrap();
}