Made LEADER_ROTATION_INTERVAL settable so that integration tests don't time out

This commit is contained in:
Carl 2018-09-17 15:43:23 -07:00 committed by Greg Fitzgerald
parent 2030dfa435
commit b10de40506
9 changed files with 147 additions and 56 deletions

View File

@ -83,7 +83,7 @@ fn main() -> () {
let node_info = node.info.clone();
let pubkey = keypair.pubkey();
let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false);
let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false, None);
// airdrop stuff, probably goes away at some point
let leader = match network {

View File

@ -1,7 +1,7 @@
//! The `broadcast_stage` broadcasts data from a leader node to validators
//!
use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo, LEADER_ROTATION_INTERVAL};
use crdt::{Crdt, CrdtError, NodeInfo};
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
@ -184,9 +184,16 @@ impl BroadcastStage {
coding: entry_height,
};
let mut receive_index = entry_height;
let me = crdt.read().unwrap().my_data().clone();
let me;
let leader_rotation_interval;
{
let rcrdt = crdt.read().unwrap();
me = rcrdt.my_data().clone();
leader_rotation_interval = rcrdt.get_leader_rotation_interval();
}
loop {
if transmit_index.data % (LEADER_ROTATION_INTERVAL as u64) == 0 {
if transmit_index.data % (leader_rotation_interval as u64) == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
match rcrdt.get_scheduled_leader(transmit_index.data) {
@ -272,7 +279,7 @@ impl Service for BroadcastStage {
#[cfg(test)]
mod tests {
use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL};
use crdt::{Crdt, Node};
use entry::Entry;
use mint::Mint;
use packet::BlobRecycler;
@ -361,11 +368,12 @@ mod tests {
#[test]
fn test_broadcast_stage_leader_rotation_exit() {
let broadcast_info = setup_dummy_broadcast_stage();
let leader_rotation_interval = 10;
{
let mut wcrdt = broadcast_info.crdt.write().unwrap();
wcrdt.set_leader_rotation_interval(leader_rotation_interval);
// Set the leader for the next rotation to be myself
wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, broadcast_info.my_id);
wcrdt.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id);
}
let genesis_len = broadcast_info.entries.len() as u64;
@ -375,12 +383,12 @@ mod tests {
.expect("Ledger should not be empty")
.id;
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
let mut recorder = Recorder::new(last_entry_hash);
for _ in genesis_len..LEADER_ROTATION_INTERVAL {
for _ in genesis_len..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
broadcast_info.entry_sender.send(new_entry).unwrap();
}
@ -390,12 +398,12 @@ mod tests {
.crdt
.write()
.unwrap()
.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, broadcast_info.buddy_id);
.set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id);
// Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us
// Input another leader_rotation_interval dummy entries, which will take us
// past the point of the leader rotation. The write_stage will see that
// it's no longer the leader after checking the crdt, and exit
for _ in 0..LEADER_ROTATION_INTERVAL {
for _ in 0..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
match broadcast_info.entry_sender.send(new_entry) {
// We disconnected, break out of loop and check the results
@ -413,6 +421,6 @@ mod tests {
let highest_index = find_highest_window_index(&broadcast_info.shared_window);
// The blob index is zero indexed, so it will always be one behind the entry height
// which starts at one.
assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1);
assert_eq!(highest_index, 2 * leader_rotation_interval - 1);
}
}

View File

@ -37,10 +37,6 @@ use timing::{duration_as_ms, timestamp};
use window::{SharedWindow, WindowIndex};
pub const FULLNODE_PORT_RANGE: (u16, u16) = (8000, 10_000);
#[cfg(test)]
pub const LEADER_ROTATION_INTERVAL: u64 = 10;
#[cfg(not(test))]
pub const LEADER_ROTATION_INTERVAL: u64 = 100;
/// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100;
@ -213,7 +209,11 @@ pub struct Crdt {
/// TODO: Clearly not the correct implementation of this, but a temporary abstraction
/// for testing
pub scheduled_leaders: HashMap<u64, Pubkey>,
// TODO: Is there a better way to do this? We didn't make this a constant because
// we want to be able to set it in integration tests so that the tests don't time out.
pub leader_rotation_interval: u64,
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)]
enum Protocol {
@ -244,6 +244,7 @@ impl Crdt {
id: node_info.id,
update_index: 1,
scheduled_leaders: HashMap::new(),
leader_rotation_interval: 100,
};
me.local.insert(node_info.id, me.update_index);
me.table.insert(node_info.id, node_info);
@ -314,6 +315,14 @@ impl Crdt {
}
}
pub fn set_leader_rotation_interval(&mut self, leader_rotation_interval: u64) {
self.leader_rotation_interval = leader_rotation_interval;
}
pub fn get_leader_rotation_interval(&self) -> u64 {
self.leader_rotation_interval
}
// TODO: Dummy leader schedule setter, need to implement actual leader scheduling.
pub fn set_scheduled_leader(&mut self, entry_height: u64, new_leader_id: Pubkey) -> () {
self.scheduled_leaders.insert(entry_height, new_leader_id);

View File

@ -337,6 +337,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket");
@ -373,7 +374,7 @@ mod tests {
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.info.clone();
let server = Fullnode::new(leader, &ledger_path, leader_keypair, None, false);
let server = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None);
let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket");
let transactions_socket =

View File

@ -121,6 +121,7 @@ impl Fullnode {
keypair: Keypair,
leader_addr: Option<SocketAddr>,
sigverify_disabled: bool,
leader_rotation_interval: Option<u64>,
) -> Self {
info!("creating bank...");
let (bank, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path);
@ -145,6 +146,7 @@ impl Fullnode {
leader_info.as_ref(),
ledger_path,
sigverify_disabled,
leader_rotation_interval,
);
match leader_addr {
@ -224,6 +226,7 @@ impl Fullnode {
leader_info: Option<&NodeInfo>,
ledger_path: &str,
sigverify_disabled: bool,
leader_rotation_interval: Option<u64>,
) -> Self {
if leader_info.is_none() {
node.info.leader_id = node.info.id;
@ -257,7 +260,11 @@ impl Fullnode {
window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler);
let shared_window = Arc::new(RwLock::new(window));
let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new")));
let mut crdt = Crdt::new(node.info).expect("Crdt::new");
if let Some(interval) = leader_rotation_interval {
crdt.set_leader_rotation_interval(interval);
}
let crdt = Arc::new(RwLock::new(crdt));
let ncp = Ncp::new(
&crdt,
@ -440,8 +447,10 @@ impl Fullnode {
// TODO: only used for testing, get rid of this once we have actual
// leader scheduling
pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) {
let mut wcrdt = self.crdt.write().unwrap();
wcrdt.set_scheduled_leader(entry_height, leader_id);
self.crdt
.write()
.unwrap()
.set_scheduled_leader(entry_height, leader_id);
}
fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec<Entry>) {
@ -508,6 +517,7 @@ mod tests {
Some(&entry),
&validator_ledger_path,
false,
None,
);
v.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap();
@ -533,6 +543,7 @@ mod tests {
Some(&entry),
&validator_ledger_path,
false,
None,
)
}).collect();

View File

@ -472,6 +472,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
sleep(Duration::from_millis(900));
@ -517,6 +518,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
//TODO: remove this sleep, or add a retry so CI is stable
sleep(Duration::from_millis(300));
@ -575,6 +577,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
sleep(Duration::from_millis(300));
@ -634,6 +637,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
sleep(Duration::from_millis(900));

View File

@ -387,6 +387,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
sleep(Duration::from_millis(200));
@ -453,6 +454,7 @@ mod tests {
None,
&ledger_path,
false,
None,
);
sleep(Duration::from_millis(200));

View File

@ -4,7 +4,7 @@
use bank::Bank;
use counter::Counter;
use crdt::{Crdt, LEADER_ROTATION_INTERVAL};
use crdt::Crdt;
use entry::Entry;
use ledger::{Block, LedgerWriter};
use log::Level;
@ -40,13 +40,14 @@ impl WriteStage {
// reflecting whether we actually hit an entry height for leader rotation.
fn find_leader_rotation_index(
crdt: &Arc<RwLock<Crdt>>,
leader_rotation_interval: u64,
entry_height: u64,
mut new_entries: Vec<Entry>,
) -> (Vec<Entry>, bool) {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let entries_until_leader_rotation =
LEADER_ROTATION_INTERVAL - (entry_height % LEADER_ROTATION_INTERVAL);
leader_rotation_interval - (entry_height % leader_rotation_interval);
let new_entries_length = new_entries.len();
@ -55,7 +56,7 @@ impl WriteStage {
let mut is_leader_rotation = false;
loop {
if (entry_height + i as u64) % LEADER_ROTATION_INTERVAL == 0 {
if (entry_height + i as u64) % leader_rotation_interval == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
let next_leader = rcrdt.get_scheduled_leader(entry_height + i as u64);
@ -69,7 +70,7 @@ impl WriteStage {
break;
}
i += cmp::min(LEADER_ROTATION_INTERVAL as usize, new_entries_length - i);
i += cmp::min(leader_rotation_interval as usize, new_entries_length - i);
}
new_entries.truncate(i as usize);
@ -85,6 +86,7 @@ impl WriteStage {
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
entry_height: &mut u64,
leader_rotation_interval: u64,
) -> Result<()> {
let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
@ -96,6 +98,7 @@ impl WriteStage {
// rotation
let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index(
crdt,
leader_rotation_interval,
*entry_height + num_new_entries as u64,
received_entries,
);
@ -194,14 +197,20 @@ impl WriteStage {
.spawn(move || {
let mut last_vote = 0;
let mut last_valid_validator_timestamp = 0;
let id = crdt.read().unwrap().id;
let id;
let leader_rotation_interval;
{
let rcrdt = crdt.read().unwrap();
id = crdt.read().unwrap().id;
leader_rotation_interval = rcrdt.get_leader_rotation_interval();
}
let mut entry_height = entry_height;
loop {
// Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height
// n * LEADER_ROTATION_INTERVAL for some "n". Once we've forwarded
// n * leader_rotation_interval for some "n". Once we've forwarded
// that last block, check for the next scheduled leader.
if entry_height % (LEADER_ROTATION_INTERVAL as u64) == 0 {
if entry_height % (leader_rotation_interval as u64) == 0 {
let rcrdt = crdt.read().unwrap();
let my_id = rcrdt.my_data().id;
let scheduled_leader = rcrdt.get_scheduled_leader(entry_height);
@ -225,6 +234,7 @@ impl WriteStage {
&entry_sender,
&entry_receiver,
&mut entry_height,
leader_rotation_interval,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
@ -282,7 +292,7 @@ impl Service for WriteStage {
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::{Crdt, Node, LEADER_ROTATION_INTERVAL};
use crdt::{Crdt, Node};
use entry::Entry;
use ledger::{genesis, read_ledger};
use packet::BlobRecycler;
@ -360,12 +370,14 @@ mod tests {
#[test]
fn test_write_stage_leader_rotation_exit() {
let write_stage_info = setup_dummy_write_stage();
let leader_rotation_interval = 10;
write_stage_info
.crdt
.write()
.unwrap()
.set_scheduled_leader(LEADER_ROTATION_INTERVAL, write_stage_info.my_id);
{
let mut wcrdt = write_stage_info.crdt.write().unwrap();
wcrdt.set_leader_rotation_interval(leader_rotation_interval);
wcrdt.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id);
}
let last_entry_hash = write_stage_info
.ledger_tail
@ -375,11 +387,11 @@ mod tests {
let genesis_entry_height = write_stage_info.ledger_tail.len() as u64;
// Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
let mut recorder = Recorder::new(last_entry_hash);
for _ in genesis_entry_height..LEADER_ROTATION_INTERVAL {
for _ in genesis_entry_height..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
@ -391,14 +403,14 @@ mod tests {
{
let mut wcrdt = write_stage_info.crdt.write().unwrap();
wcrdt.insert(&leader2_info.info);
wcrdt.set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, leader2_keypair.pubkey());
wcrdt.set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey());
}
// Input another LEADER_ROTATION_INTERVAL dummy entries one at a time,
// Input another leader_rotation_interval dummy entries one at a time,
// which will take us past the point of the leader rotation.
// The write_stage will see that it's no longer the leader after
// checking the schedule, and exit
for _ in 0..LEADER_ROTATION_INTERVAL {
for _ in 0..leader_rotation_interval {
let new_entry = recorder.record(vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
@ -408,10 +420,10 @@ mod tests {
WriteStageReturnType::LeaderRotation
);
// Make sure the ledger contains exactly LEADER_ROTATION_INTERVAL entries
// Make sure the ledger contains exactly leader_rotation_interval entries
let (entry_height, _) =
process_ledger(&write_stage_info.leader_ledger_path, &write_stage_info.bank);
remove_dir_all(write_stage_info.leader_ledger_path).unwrap();
assert_eq!(entry_height, 2 * LEADER_ROTATION_INTERVAL);
assert_eq!(entry_height, 2 * leader_rotation_interval);
}
}

View File

@ -5,7 +5,7 @@ extern crate chrono;
extern crate serde_json;
extern crate solana;
use solana::crdt::{Crdt, Node, NodeInfo, LEADER_ROTATION_INTERVAL};
use solana::crdt::{Crdt, Node, NodeInfo};
use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::hash::Hash;
@ -145,7 +145,14 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
writer.write_entries(entries).unwrap();
}
let leader = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
let leader = Fullnode::new(
leader,
&leader_ledger_path,
leader_keypair,
None,
false,
None,
);
// Send leader some tokens to vote
let leader_balance =
@ -163,6 +170,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
);
// contains the leader and new node
@ -218,7 +226,14 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
);
ledger_paths.push(zero_ledger_path.clone());
let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
let server = Fullnode::new(
leader,
&leader_ledger_path,
leader_keypair,
None,
false,
None,
);
// Send leader some tokens to vote
let leader_balance =
@ -241,6 +256,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
);
nodes.push(val);
}
@ -276,6 +292,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
);
nodes.push(val);
//contains the leader and new node
@ -335,7 +352,14 @@ fn test_multi_node_basic() {
let (alice, leader_ledger_path, _) = genesis("multi_node_basic", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
let server = Fullnode::new(
leader,
&leader_ledger_path,
leader_keypair,
None,
false,
None,
);
// Send leader some tokens to vote
let leader_balance =
@ -354,6 +378,7 @@ fn test_multi_node_basic() {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
);
nodes.push(val);
}
@ -396,7 +421,14 @@ fn test_boot_validator_from_file() -> result::Result<()> {
ledger_paths.push(leader_ledger_path.clone());
let leader_data = leader.info.clone();
let leader_fullnode = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, false);
let leader_fullnode = Fullnode::new(
leader,
&leader_ledger_path,
leader_keypair,
None,
false,
None,
);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
@ -415,6 +447,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
);
let mut client = mk_client(&validator_data);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
@ -433,7 +466,7 @@ fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) {
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.info.clone();
let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false);
let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None);
(leader_data, leader_fullnode)
}
@ -487,6 +520,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
);
// trigger broadcast, validator should catch up from leader, whose window contains
@ -544,7 +578,14 @@ fn test_multi_node_dynamic_network() {
let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.info.clone();
let server = Fullnode::new(leader, &leader_ledger_path, leader_keypair, None, true);
let server = Fullnode::new(
leader,
&leader_ledger_path,
leader_keypair,
None,
true,
None,
);
// Send leader some tokens to vote
let leader_balance = send_tx_and_retry_get_balance(
@ -617,6 +658,7 @@ fn test_multi_node_dynamic_network() {
keypair,
Some(leader_data.contact_info.ncp),
true,
None,
);
(rd, val)
}).unwrap()
@ -712,6 +754,7 @@ fn test_multi_node_dynamic_network() {
#[test]
fn test_leader_to_validator_transition() {
logger::setup();
let leader_rotation_interval = 20;
// Make a dummy address to be the sink for this test's mock transactions
let bob_pubkey = Keypair::new().pubkey();
@ -720,7 +763,7 @@ fn test_leader_to_validator_transition() {
// in the leader ledger
let (mint, leader_ledger_path, entries) = genesis(
"test_leader_to_validator_transition",
(3 * LEADER_ROTATION_INTERVAL) as i64,
(3 * leader_rotation_interval) as i64,
);
let genesis_height = entries.len() as u64;
@ -737,10 +780,11 @@ fn test_leader_to_validator_transition() {
leader_keypair,
None,
false,
Some(leader_rotation_interval),
);
// Set the next leader to be Bob
leader.set_scheduled_leader(bob_pubkey, LEADER_ROTATION_INTERVAL);
leader.set_scheduled_leader(bob_pubkey, leader_rotation_interval);
// Make an extra node for our leader to broadcast to,
// who won't vote and mess with our leader's entry count
@ -762,14 +806,14 @@ fn test_leader_to_validator_transition() {
assert!(converged);
let extra_transactions = std::cmp::max(LEADER_ROTATION_INTERVAL / 20, 1);
let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1);
// Push leader "extra_transactions" past LEADER_ROTATION_INTERVAL entry height,
// Push leader "extra_transactions" past leader_rotation_interval entry height,
// make sure the leader stops.
assert!(genesis_height < LEADER_ROTATION_INTERVAL);
for i in genesis_height..(LEADER_ROTATION_INTERVAL + extra_transactions) {
assert!(genesis_height < leader_rotation_interval);
for i in genesis_height..(leader_rotation_interval + extra_transactions) {
let expected_balance = std::cmp::min(
LEADER_ROTATION_INTERVAL - genesis_height,
leader_rotation_interval - genesis_height,
i - genesis_height,
);
@ -793,7 +837,7 @@ fn test_leader_to_validator_transition() {
// transactions earlier
let mut leader_client = mk_client(&leader_info);
let expected_bal = LEADER_ROTATION_INTERVAL - genesis_height;
let expected_bal = leader_rotation_interval - genesis_height;
let bal = leader_client
.poll_get_balance(&bob_pubkey)
.expect("Expected success when polling newly transitioned validator for balance")