Added check in write stage to exit when scheduled entry_height for leader rotation is detected

This commit is contained in:
Carl 2018-09-11 18:40:38 -07:00 committed by Greg Fitzgerald
parent 91cf14e641
commit f28ba3937b
5 changed files with 320 additions and 16 deletions

View File

@ -37,6 +37,11 @@ use timing::{duration_as_ms, timestamp};
use window::{SharedWindow, WindowIndex};
pub const FULLNODE_PORT_RANGE: (u16, u16) = (8000, 10_000);
#[cfg(test)]
pub const LEADER_ROTATION_INTERVAL: u64 = 10;
#[cfg(not(test))]
pub const LEADER_ROTATION_INTERVAL: u64 = 100;
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
const GOSSIP_PURGE_MILLIS: u64 = 15000;
@ -205,6 +210,9 @@ pub struct Crdt {
/// last time we heard from anyone getting a message fro this public key
/// these are rumers and shouldn't be trusted directly
external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>>,
/// TODO: Clearly not the correct implementation of this, but a temporary abstraction
/// for testing
pub scheduled_leaders: HashMap<u64, Pubkey>,
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)]
@ -235,6 +243,7 @@ impl Crdt {
external_liveness: HashMap::new(),
id: node_info.id,
update_index: 1,
scheduled_leaders: HashMap::new(),
};
me.local.insert(node_info.id, me.update_index);
me.table.insert(node_info.id, node_info);
@ -297,6 +306,19 @@ impl Crdt {
self.insert(&me);
}
// TODO: Dummy leader scheduler, need to implement actual leader scheduling.
pub fn get_scheduled_leader(&self, entry_height: u64) -> Option<Pubkey> {
match self.scheduled_leaders.get(&entry_height) {
Some(x) => Some(x.clone()),
None => Some(self.my_data().leader_id),
}
}
// TODO: Dummy leader schedule setter, need to implement actual leader scheduling.
pub fn set_scheduled_leader(&mut self, entry_height: u64, new_leader_id: Pubkey) -> () {
self.scheduled_leaders.insert(entry_height, new_leader_id);
}
pub fn get_external_liveness_entry(&self, key: &Pubkey) -> Option<&HashMap<Pubkey, u64>> {
self.external_liveness.get(key)
}

View File

@ -205,7 +205,7 @@ impl Fullnode {
/// `--------` | | `------------`
/// `-------------------------------`
/// ```
pub fn new_with_bank(
pub fn new_with_bank(
keypair: Keypair,
bank: Bank,
entry_height: u64,
@ -297,6 +297,7 @@ pub fn new_with_bank(
exit.clone(),
ledger_path,
sigverify_disabled,
entry_height,
);
let broadcast_stage = BroadcastStage::new(

View File

@ -7,10 +7,14 @@ use entry::Entry;
use hash::Hash;
use instruction::Vote;
use log::Level::Trace;
#[cfg(test)]
use mint::Mint;
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
use signature::Pubkey;
#[cfg(test)]
use signature::{Keypair, KeypairUtil};
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
@ -542,6 +546,20 @@ pub fn next_entries(
next_entries_mut(&mut id, &mut num_hashes, transactions)
}
#[cfg(test)]
pub fn tmp_ledger_path(name: &str) -> String {
let keypair = Keypair::new();
format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey())
}
#[cfg(test)]
pub fn genesis(name: &str, num: i64) -> (Mint, String) {
let mint = Mint::new(num);
let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
(mint, path)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -62,6 +62,7 @@ impl Tpu {
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
entry_height: u64,
) -> (Self, Receiver<Vec<Entry>>) {
let mut packet_recycler = PacketRecycler::default();
packet_recycler.set_name("tpu::Packet");
@ -89,6 +90,7 @@ impl Tpu {
blob_recycler.clone(),
ledger_path,
entry_receiver,
entry_height,
);
let tpu = Tpu {

View File

@ -4,7 +4,7 @@
use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use crdt::{Crdt, LEADER_ROTATION_INTERVAL};
use entry::Entry;
use ledger::{Block, LedgerWriter};
use log::Level;
@ -12,6 +12,7 @@ use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
use std::cmp;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
@ -22,11 +23,60 @@ use streamer::responder;
use timing::{duration_as_ms, duration_as_s};
use vote_stage::send_leader_vote;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum WriteStageReturnType {
LeaderRotation,
ChannelDisconnected,
}
pub struct WriteStage {
thread_hdls: Vec<JoinHandle<()>>,
write_thread: JoinHandle<WriteStageReturnType>,
}
impl WriteStage {
// Given a vector of potential new entries to write, return as many as we can
// fit before we hit the entry height for leader rotation. Also return a boolean
// reflecting whether we actually hit an entry height for leader rotation.
fn find_leader_rotation_index(
crdt: &Arc<RwLock<Crdt>>,
entry_height: u64,
mut new_entries: Vec<Entry>,
) -> (Vec<Entry>, bool) {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let entries_until_leader_rotation =
LEADER_ROTATION_INTERVAL - (entry_height % LEADER_ROTATION_INTERVAL);
let new_entries_length = new_entries.len();
let mut i = cmp::min(entries_until_leader_rotation as usize, new_entries_length);
let mut is_leader_rotation = false;
loop {
if (entry_height + i as u64) % LEADER_ROTATION_INTERVAL == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
let next_leader = rcrdt.get_scheduled_leader(entry_height + i as u64);
if next_leader != Some(my_id) {
is_leader_rotation = true;
break;
}
}
if i == new_entries_length {
break;
}
i += cmp::min(LEADER_ROTATION_INTERVAL as usize, new_entries_length - i);
}
new_entries.truncate(i as usize);
(new_entries, is_leader_rotation)
}
/// Process any Entry items that have been published by the RecordStage.
/// continuosly send entries out
pub fn write_and_send_entries(
@ -34,19 +84,37 @@ impl WriteStage {
ledger_writer: &mut LedgerWriter,
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
entry_height: &mut u64,
) -> Result<()> {
let mut ventries = Vec::new();
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut num_entries = entries.len();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut num_new_entries = received_entries.len();
let mut num_txs = 0;
ventries.push(entries);
while let Ok(more) = entry_receiver.try_recv() {
num_entries += more.len();
ventries.push(more);
loop {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index(
crdt,
*entry_height + num_new_entries as u64,
received_entries,
);
num_new_entries += new_entries.len();
ventries.push(new_entries);
if is_leader_rotation {
break;
}
if let Ok(n) = entry_receiver.try_recv() {
received_entries = n;
} else {
break;
}
}
info!("write_stage entries: {}", num_entries);
info!("write_stage entries: {}", num_new_entries);
let to_blobs_total = 0;
let mut blob_send_total = 0;
@ -65,6 +133,9 @@ impl WriteStage {
crdt_votes_total += duration_as_ms(&crdt_votes_start.elapsed());
ledger_writer.write_entries(entries.clone())?;
// Once the entries have been written to the ledger, then we can
// safely incement entry height
*entry_height += entries.len() as u64;
let register_entry_start = Instant::now();
register_entry_total += duration_as_ms(&register_entry_start.elapsed());
@ -105,6 +176,7 @@ impl WriteStage {
blob_recycler: BlobRecycler,
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>,
entry_height: u64,
) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -117,21 +189,47 @@ impl WriteStage {
let (entry_sender, entry_receiver_forward) = channel();
let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap();
let thread_hdl = Builder::new()
let write_thread = Builder::new()
.name("solana-writer".to_string())
.spawn(move || {
let mut last_vote = 0;
let mut last_valid_validator_timestamp = 0;
let id = crdt.read().unwrap().id;
let mut entry_height = entry_height;
loop {
// Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height
// n * LEADER_ROTATION_INTERVAL, so once we've forwarded that last block,
// check for the next leader.
if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
let scheduled_leader = rcrdt.get_scheduled_leader(entry_height);
drop(rcrdt);
match scheduled_leader {
Some(id) if id == my_id => (),
// If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit.
_ => {
// When the broadcast stage has received the last blob, it
// will signal to close the fetch stage, which will in turn
// close down this write stage
return WriteStageReturnType::LeaderRotation;
}
}
}
if let Err(e) = Self::write_and_send_entries(
&crdt,
&mut ledger_writer,
&entry_sender,
&entry_receiver,
&mut entry_height,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
return WriteStageReturnType::ChannelDisconnected
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!(
@ -158,18 +256,181 @@ impl WriteStage {
}
}).unwrap();
let thread_hdls = vec![t_responder, thread_hdl];
(WriteStage { thread_hdls }, entry_receiver_forward)
let thread_hdls = vec![t_responder];
(
WriteStage {
write_thread,
thread_hdls,
},
entry_receiver_forward,
)
}
}
impl Service for WriteStage {
type JoinReturnType = ();
type JoinReturnType = WriteStageReturnType;
fn join(self) -> thread::Result<()> {
fn join(self) -> thread::Result<WriteStageReturnType> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
self.write_thread.join()
}
}
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL};
use entry::Entry;
use ledger::{genesis, read_ledger};
use packet::BlobRecycler;
use recorder::Recorder;
use service::Service;
use signature::{Keypair, KeypairUtil, Pubkey};
use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use write_stage::{WriteStage, WriteStageReturnType};
fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) {
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
bank.process_ledger(entries).expect("process_ledger")
}
fn setup_dummy_write_stage() -> (
Pubkey,
WriteStage,
Arc<AtomicBool>,
Sender<Vec<Entry>>,
Receiver<Vec<Entry>>,
Arc<RwLock<Crdt>>,
Arc<Bank>,
String,
Vec<Entry>,
) {
// Setup leader info
let leader_keypair = Keypair::new();
let id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let crdt = Arc::new(RwLock::new(Crdt::new(leader_info.info).expect("Crdt::new")));
let bank = Bank::new_default(true);
let bank = Arc::new(bank);
let blob_recycler = BlobRecycler::default();
// Make a ledger
let (_, leader_ledger_path) = genesis("test_leader_rotation_exit", 10_000);
let (entry_height, ledger_tail) = process_ledger(&leader_ledger_path, &bank);
// Make a dummy pipe
let (entry_sender, entry_receiver) = channel();
// Start up the write stage
let (write_stage, write_stage_entry_receiver) = WriteStage::new(
leader_keypair,
bank.clone(),
crdt.clone(),
blob_recycler,
&leader_ledger_path,
entry_receiver,
entry_height,
);
let exit_sender = Arc::new(AtomicBool::new(false));
(
id,
write_stage,
exit_sender,
entry_sender,
write_stage_entry_receiver,
crdt,
bank,
leader_ledger_path,
ledger_tail,
)
}
#[test]
fn test_write_stage_leader_rotation_exit() {
let (
id,
write_stage,
exit_sender,
entry_sender,
_write_stage_entry_receiver,
crdt,
bank,
leader_ledger_path,
ledger_tail,
) = setup_dummy_write_stage();
crdt.write()
.unwrap()
.set_scheduled_leader(LEADER_ROTATION_INTERVAL, id);
let last_entry_hash = ledger_tail.last().expect("Ledger should not be empty").id;
let genesis_entry_height = ledger_tail.len() as u64;
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
let mut recorder = Recorder::new(last_entry_hash);
for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]);
entry_sender.send(new_entry).unwrap();
}
// Wait until at least LEADER_ROTATION_INTERVAL have been written to the ledger
loop {
sleep(Duration::from_secs(1));
let (current_entry_height, _) = process_ledger(&leader_ledger_path, &bank);
if current_entry_height == LEADER_ROTATION_INTERVAL {
break;
}
}
// Set the scheduled next leader in the crdt to some other node
let leader2_keypair = Keypair::new();
let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey());
{
let mut wcrdt = crdt.write().unwrap();
wcrdt.insert(&leader2_info.info);
wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey());
}
// Input another LEADER_ROTATION_INTERVAL dummy entries one at a time,
// which will take us past the point of the leader rotation.
// The write_stage will see that it's no longer the leader after
// checking the crdt, and exit
for _ in 0..LEADER_ROTATION_INTERVAL {
let new_entry = recorder.record(vec![]);
entry_sender.send(new_entry).unwrap();
}
// Make sure the threads closed cleanly
exit_sender.store(true, Ordering::Relaxed);
assert_eq!(
write_stage.join().unwrap(),
WriteStageReturnType::LeaderRotation
);
// Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries
let (entry_height, _) = process_ledger(&leader_ledger_path, &bank);
remove_dir_all(leader_ledger_path).unwrap();
assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL);
}
}