From d3cb161c36d63bf19f010264bae7795eff9b3a62 Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 13 Sep 2018 18:47:39 -0700 Subject: [PATCH] Added broadcast stage test for leader rotation exit --- src/broadcast_stage.rs | 156 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index adb2137035..f366260943 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -235,3 +235,159 @@ impl Service for BroadcastStage { Ok(()) } } + +#[cfg(test)] +mod tests { + use crdt::{Crdt, LEADER_ROTATION_INTERVAL, Node}; + use entry::Entry; + use ledger::Block; + use hash::Hash; + use packet::BlobRecycler; + use recorder::Recorder; + use service::Service; + use signature::{Keypair, KeypairUtil, Pubkey}; + use std::sync::{Arc, RwLock}; + use std::sync::mpsc::{channel, Receiver}; + use broadcast_stage::BroadcastStage; + use mint::Mint; + use streamer::BlobSender; + use std::cmp; + use window::{new_window_from_entries, SharedWindow}; + use std::thread::sleep; + use std::time::Duration; + + fn setup_dummy_broadcast_stage() -> + (Pubkey, Pubkey, BroadcastStage, SharedWindow, BlobSender, BlobRecycler, Arc>, Vec, Receiver) + { + // Setup dummy leader info + let leader_keypair = Keypair::new(); + let id = leader_keypair.pubkey(); + let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + + // Give the leader somebody to broadcast to so he isn't lonely + let buddy_keypair = Keypair::new(); + let buddy_id = buddy_keypair.pubkey(); + let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey()); + + // Fill the crdt with the buddy's info + let mut crdt = Crdt::new(leader_info.info.clone()).expect("Crdt::new"); + crdt.insert(&broadcast_buddy.info); + let crdt = Arc::new(RwLock::new(crdt)); + let blob_recycler = BlobRecycler::default(); + + // Make dummy initial entries + let mint = Mint::new(10000); + let entries = mint.create_entries(); + let entry_height = entries.len() as u64; + + // Setup a window + let window = + new_window_from_entries(&entries, entry_height, &leader_info.info, &blob_recycler); + + let shared_window = Arc::new(RwLock::new(window)); + + let (blob_sender, blob_receiver) = channel(); + let (exit_sender, exit_receiver) = channel(); + + // Start up the broadcast stage + let broadcast_stage = BroadcastStage::new( + leader_info.sockets.broadcast, + crdt.clone(), + shared_window.clone(), + entry_height, + blob_recycler.clone(), + blob_receiver, + exit_sender, + ); + + ( + id, + buddy_id, + broadcast_stage, + shared_window, + blob_sender, + blob_recycler, + crdt, + entries, + exit_receiver, + ) + } + + fn find_highest_window_index(shared_window: &SharedWindow) -> u64 { + let window = shared_window.read().unwrap(); + window.iter().fold(0, |m, w_slot| { + if let Some(ref blob) = w_slot.data { + cmp::max(m, blob.read().unwrap().get_index().unwrap()) + } else { + m + } + }) + } + + #[test] + fn test_broadcast_stage_leader_rotation_exit() { + let ( + id, + buddy_id, + broadcast_stage, + shared_window, + blob_sender, + blob_recycler, + crdt, + entries, + exit_receiver, + ) = setup_dummy_broadcast_stage(); + { + let mut wcrdt = crdt.write().unwrap(); + // Set leader to myself + wcrdt.set_leader(id); + // Set the leader for the next rotation to also be myself + wcrdt.set_scheduled_leader(LEADER_ROTATION_INTERVAL, id); + } + + let genesis_len = entries.len() as u64; + let last_entry_hash = + entries.last().expect("Ledger should not be empty").id; + + // Input enough entries to make exactly LEADER_ROTATION_INTERVAL entries, which will + // trigger a check for leader rotation. Because the next scheduled leader + // is ourselves, we won't exit + let mut recorder = Recorder::new(last_entry_hash); + + for _ in genesis_len..LEADER_ROTATION_INTERVAL { + let new_entry = recorder.record(vec![]); + let blob = new_entry.to_blobs(&blob_recycler); + blob_sender.send(blob).unwrap(); + } + + // Set the scheduled next leader in the crdt to the other buddy on the network + crdt.write() + .unwrap() + .set_scheduled_leader(2 * LEADER_ROTATION_INTERVAL, buddy_id); + + // Input another LEADER_ROTATION_INTERVAL dummy entries, which will take us + // past the point of the leader rotation. The write_stage will see that + // it's no longer the leader after checking the crdt, and exit + for _ in 0..LEADER_ROTATION_INTERVAL { + let new_entry = recorder.record(vec![]); + let blob = new_entry.to_blobs(&blob_recycler); + match blob_sender.send(blob) { + // We disconnected, break out of loop and check the results + Err(_) => break, + _ => (), + }; + } + + match exit_receiver.recv() { + Ok(x) if x == false => + panic!("Unexpected value on exit channel for Broadcast stage"), + _ => (), + } + + let highest_index = find_highest_window_index(&shared_window); + + assert_eq!(highest_index, 2 * LEADER_ROTATION_INTERVAL - 1); + // Make sure the threads closed cleanly + broadcast_stage.join().unwrap(); + } +}