Cleaned up find_leader_rotation function. Added testing for WriteStage find_leader_rotation_index() function (#1276)

This commit is contained in:
carllin 2018-09-19 18:16:00 -07:00 committed by GitHub
parent 55126f5fb6
commit 7b9c7d4150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 135 additions and 10 deletions

View File

@ -12,6 +12,8 @@ use packet::BlobRecycler;
use result::{Error, Result}; use result::{Error, Result};
use service::Service; use service::Service;
use signature::Keypair; use signature::Keypair;
#[cfg(test)]
use signature::KeypairUtil;
use std::cmp; use std::cmp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
@ -44,15 +46,10 @@ impl WriteStage {
entry_height: u64, entry_height: u64,
mut new_entries: Vec<Entry>, mut new_entries: Vec<Entry>,
) -> (Vec<Entry>, bool) { ) -> (Vec<Entry>, bool) {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let entries_until_leader_rotation =
leader_rotation_interval - (entry_height % leader_rotation_interval);
let new_entries_length = new_entries.len(); let new_entries_length = new_entries.len();
let mut i = cmp::min(entries_until_leader_rotation as usize, new_entries_length); // i is the number of entries to take
let mut i = 0;
let mut is_leader_rotation = false; let mut is_leader_rotation = false;
loop { loop {
@ -70,7 +67,18 @@ impl WriteStage {
break; break;
} }
i += cmp::min(leader_rotation_interval as usize, new_entries_length - i); // Find out how many more entries we can squeeze in until the next leader
// rotation
let entries_until_leader_rotation =
leader_rotation_interval - (entry_height % leader_rotation_interval);
// Check the next leader rotation height entries in new_entries, or
// if the new_entries doesnt have that many entries remaining,
// just check the rest of the new_entries_vector
i += cmp::min(
entries_until_leader_rotation as usize,
new_entries_length - i,
);
} }
new_entries.truncate(i as usize); new_entries.truncate(i as usize);
@ -90,7 +98,7 @@ impl WriteStage {
) -> Result<()> { ) -> Result<()> {
let mut ventries = Vec::new(); let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut num_new_entries = received_entries.len(); let mut num_new_entries = 0;
let mut num_txs = 0; let mut num_txs = 0;
loop { loop {
@ -205,7 +213,6 @@ impl WriteStage {
let mut entry_height = entry_height; let mut entry_height = entry_height;
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
loop { loop {
info!("write_stage entry height: {}", entry_height);
// Note that entry height is not zero indexed, it starts at 1, so the // Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height // old leader is in power up to and including entry height
// n * leader_rotation_interval for some "n". Once we've forwarded // n * leader_rotation_interval for some "n". Once we've forwarded
@ -294,6 +301,7 @@ mod tests {
use bank::Bank; use bank::Bank;
use crdt::{Crdt, Node}; use crdt::{Crdt, Node};
use entry::Entry; use entry::Entry;
use hash::Hash;
use ledger::{genesis, read_ledger}; use ledger::{genesis, read_ledger};
use recorder::Recorder; use recorder::Recorder;
use service::Service; use service::Service;
@ -425,4 +433,121 @@ mod tests {
remove_dir_all(write_stage_info.leader_ledger_path).unwrap(); remove_dir_all(write_stage_info.leader_ledger_path).unwrap();
assert_eq!(entry_height, 2 * leader_rotation_interval); assert_eq!(entry_height, 2 * leader_rotation_interval);
} }
#[test]
fn test_leader_index_calculation() {
// Set up a dummy node
let leader_keypair = Arc::new(Keypair::new());
let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_rotation_interval = 10;
// An epoch is the period of leader_rotation_interval entries
// time during which a leader is in power
let num_epochs = 3;
let mut crdt = Crdt::new(leader_info.info).expect("Crdt::new");
crdt.set_leader_rotation_interval(leader_rotation_interval as u64);
for i in 0..num_epochs {
crdt.set_scheduled_leader(i * leader_rotation_interval, my_id)
}
let crdt = Arc::new(RwLock::new(crdt));
let entry = Entry::new(&Hash::default(), 0, vec![], false);
// A vector that is completely within a certain epoch should return that
// entire vector
let mut len = leader_rotation_interval as usize - 1;
let mut input = vec![entry.clone(); len];
let mut result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
(num_epochs - 1) * leader_rotation_interval,
input.clone(),
);
assert_eq!(result, (input, false));
// A vector that spans two different epochs for different leaders
// should get truncated
len = leader_rotation_interval as usize - 1;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
(num_epochs * leader_rotation_interval) - 1,
input.clone(),
);
input.truncate(1);
assert_eq!(result, (input, true));
// A vector that triggers a check for leader rotation should return
// the entire vector and signal leader_rotation == false, if the
// same leader is in power for the next epoch as well.
len = 1;
let mut input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
leader_rotation_interval - 1,
input.clone(),
);
assert_eq!(result, (input, false));
// A vector of new entries that spans two epochs should return the
// entire vector, assuming that the same leader is in power for both epochs.
len = leader_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
leader_rotation_interval - 1,
input.clone(),
);
assert_eq!(result, (input, false));
// A vector of new entries that spans multiple epochs should return the
// entire vector, assuming that the same leader is in power for both dynasties.
len = (num_epochs - 1) as usize * leader_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
leader_rotation_interval - 1,
input.clone(),
);
assert_eq!(result, (input, false));
// A vector of new entries that spans multiple leader epochs and has a length
// exactly equal to the remainining number of entries before the next, different
// leader should return the entire vector and signal that leader_rotation == true.
len = (num_epochs - 1) as usize * leader_rotation_interval as usize + 1;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
leader_rotation_interval - 1,
input.clone(),
);
assert_eq!(result, (input, true));
// Start at entry height == the height for leader rotation, should return
// no entries.
len = leader_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&crdt,
leader_rotation_interval,
num_epochs * leader_rotation_interval,
input.clone(),
);
assert_eq!(result, (vec![], true));
}
} }