Fix TVU and PoH Recorder going out of sync (#3164)

* Fix broadcast_stage error

* Account for very fast ticks in tick verification
This commit is contained in:
Sagar Dhawan 2019-03-07 15:49:07 -08:00 committed by GitHub
parent 8d80da6b46
commit 02eb234399
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 179 additions and 82 deletions

View File

@ -204,7 +204,7 @@ pub fn process_blocktree(
if next_meta.is_full() {
let next_bank = Arc::new(Bank::new_from_parent(
&bank,
leader_schedule_utils::slot_leader_at(next_slot, &bank),
leader_schedule_utils::slot_leader_at(next_slot, &bank).unwrap(),
next_slot,
));
trace!("Add child bank for slot={}", next_slot);

View File

@ -43,16 +43,8 @@ impl Broadcast {
blocktree: &Arc<Blocktree>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let (bank, entries) = receiver.recv_timeout(timer)?;
let mut broadcast_table = cluster_info
.read()
.unwrap()
.sorted_tvu_peers(&staking_utils::delegated_stakes(&bank));
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
let (mut bank, entries) = receiver.recv_timeout(timer)?;
let mut max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
let now = Instant::now();
let mut num_entries = entries.len();
@ -60,17 +52,35 @@ impl Broadcast {
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
while let Ok((same_bank, entries)) = receiver.try_recv() {
num_entries += entries.len();
last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height);
assert!(same_bank.slot() == bank.slot());
if last_tick == max_tick_height {
break;
assert!(last_tick <= max_tick_height,);
if last_tick != max_tick_height {
while let Ok((same_bank, entries)) = receiver.try_recv() {
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if same_bank.slot() != bank.slot() {
num_entries = 0;
ventries.clear();
bank = same_bank.clone();
max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1;
}
num_entries += entries.len();
last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height,);
if last_tick == max_tick_height {
break;
}
}
}
let mut broadcast_table = cluster_info
.read()
.unwrap()
.sorted_tvu_peers(&staking_utils::delegated_stakes(&bank));
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
let to_blobs_start = Instant::now();

View File

@ -1,10 +1,13 @@
use crate::blocktree::Blocktree;
/// Cluster independant integration tests
///
/// All tests must start from an entry point and a funding keypair and
/// discover the rest of the network.
use crate::client::mk_client;
use crate::contact_info::ContactInfo;
use crate::entry::{Entry, EntrySlice};
use crate::gossip_service::discover;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND};
@ -59,6 +62,39 @@ pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) {
}
}
pub fn verify_ledger_ticks(ledger_path: &str, ticks_per_slot: usize) {
let ledger = Blocktree::open(ledger_path).unwrap();
let zeroth_slot = ledger.get_slot_entries(0, 0, None).unwrap();
let last_id = zeroth_slot.last().unwrap().hash;
let next_slots = ledger.get_slots_since(&[0]).unwrap().remove(&0).unwrap();
let mut pending_slots: Vec<_> = next_slots
.into_iter()
.map(|slot| (slot, 0, last_id))
.collect();
while !pending_slots.is_empty() {
let (slot, parent_slot, last_id) = pending_slots.pop().unwrap();
let next_slots = ledger
.get_slots_since(&[slot])
.unwrap()
.remove(&slot)
.unwrap();
// If you're not the last slot, you should have a full set of ticks
let should_verify_ticks = if !next_slots.is_empty() {
Some((slot - parent_slot) as usize * ticks_per_slot)
} else {
None
};
let last_id = verify_slot_ticks(&ledger, slot, &last_id, should_verify_ticks);
pending_slots.extend(
next_slots
.into_iter()
.map(|child_slot| (child_slot, slot, last_id)),
);
}
}
pub fn kill_entry_and_spend_and_verify_rest(
entry_point_info: &ContactInfo,
funding_keypair: &Keypair,
@ -105,3 +141,23 @@ pub fn kill_entry_and_spend_and_verify_rest(
}
}
}
fn get_and_verify_slot_entries(blocktree: &Blocktree, slot: u64, last_entry: &Hash) -> Vec<Entry> {
let entries = blocktree.get_slot_entries(slot, 0, None).unwrap();
assert!(entries.verify(last_entry));
entries
}
fn verify_slot_ticks(
blocktree: &Blocktree,
slot: u64,
last_entry: &Hash,
expected_num_ticks: Option<usize>,
) -> Hash {
let entries = get_and_verify_slot_entries(blocktree, slot, last_entry);
let num_ticks: usize = entries.iter().map(|entry| entry.is_tick() as usize).sum();
if let Some(expected_num_ticks) = expected_num_ticks {
assert_eq!(num_ticks, expected_num_ticks);
}
entries.last().unwrap().hash
}

View File

@ -4,14 +4,14 @@ use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
/// Return the leader schedule for the given epoch.
fn leader_schedule(epoch_height: u64, bank: &Bank) -> LeaderSchedule {
let stakes = staking_utils::delegated_stakes_at_epoch(bank, epoch_height)
.expect("epoch state must exist");
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch_height.to_le_bytes());
let mut stakes: Vec<_> = stakes.into_iter().collect();
sort_stakes(&mut stakes);
LeaderSchedule::new(&stakes, seed, bank.get_slots_in_epoch(epoch_height))
fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option<LeaderSchedule> {
staking_utils::delegated_stakes_at_epoch(bank, epoch_height).map(|stakes| {
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch_height.to_le_bytes());
let mut stakes: Vec<_> = stakes.into_iter().collect();
sort_stakes(&mut stakes);
LeaderSchedule::new(&stakes, seed, bank.get_slots_in_epoch(epoch_height))
})
}
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
@ -31,11 +31,10 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
}
/// Return the leader for the given slot.
pub fn slot_leader_at(slot: u64, bank: &Bank) -> Pubkey {
pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option<Pubkey> {
let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot);
let leader_schedule = leader_schedule(epoch, bank);
leader_schedule[slot_index as usize]
leader_schedule(epoch, bank).map(|leader_schedule| leader_schedule[slot_index as usize])
}
// Returns the number of ticks remaining from the specified tick_height to the end of the
@ -85,7 +84,7 @@ mod tests {
)
.0;
let bank = Bank::new(&genesis_block);
assert_eq!(slot_leader_at(bank.slot(), &bank), pubkey);
assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey);
}
#[test]

View File

@ -21,33 +21,26 @@ pub struct LocalCluster {
pub funding_keypair: Keypair,
/// Entry point from which the rest of the network can be discovered
pub entry_point_info: NodeInfo,
pub ledger_paths: Vec<String>,
fullnodes: Vec<Fullnode>,
ledger_paths: Vec<String>,
}
impl LocalCluster {
pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self {
Self::new_with_config(
num_nodes,
cluster_lamports,
lamports_per_node,
&FullnodeConfig::default(),
)
let stakes: Vec<_> = (0..num_nodes).map(|_| lamports_per_node).collect();
Self::new_with_config(&stakes, cluster_lamports, &FullnodeConfig::default())
}
pub fn new_with_config(
num_nodes: usize,
node_stakes: &[u64],
cluster_lamports: u64,
lamports_per_node: u64,
fullnode_config: &FullnodeConfig,
) -> Self {
// Must have enough tokens to fund vote account and set delegate
assert!(lamports_per_node > 2);
let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let (genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(cluster_lamports, leader_pubkey, lamports_per_node);
GenesisBlock::new_with_leader(cluster_lamports, leader_pubkey, node_stakes[0]);
let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path);
let mut ledger_paths = vec![];
@ -65,7 +58,9 @@ impl LocalCluster {
);
let mut fullnodes = vec![leader_server];
let mut client = mk_client(&leader_node_info);
for _ in 0..(num_nodes - 1) {
for stake in &node_stakes[1..] {
// Must have enough tokens to fund vote account and set delegate
assert!(*stake > 2);
let validator_keypair = Arc::new(Keypair::new());
let voting_keypair = Keypair::new();
let validator_pubkey = validator_keypair.pubkey();
@ -74,12 +69,8 @@ impl LocalCluster {
ledger_paths.push(ledger_path.clone());
// Send each validator some lamports to vote
let validator_balance = Self::transfer(
&mut client,
&mint_keypair,
&validator_pubkey,
lamports_per_node,
);
let validator_balance =
Self::transfer(&mut client, &mint_keypair, &validator_pubkey, *stake);
info!(
"validator {} balance {}",
validator_pubkey, validator_balance
@ -89,7 +80,7 @@ impl LocalCluster {
&mut client,
&voting_keypair,
&validator_keypair,
lamports_per_node - 1,
stake - 1,
)
.unwrap();
let validator_server = Fullnode::new(
@ -102,7 +93,7 @@ impl LocalCluster {
);
fullnodes.push(validator_server);
}
discover(&leader_node_info, num_nodes).unwrap();
discover(&leader_node_info, node_stakes.len()).unwrap();
Self {
funding_keypair: mint_keypair,
entry_point_info: leader_node_info,
@ -116,11 +107,16 @@ impl LocalCluster {
node.exit();
}
}
pub fn close(&mut self) {
pub fn close_preserve_ledgers(&mut self) {
self.exit();
while let Some(node) = self.fullnodes.pop() {
node.join().unwrap();
}
}
pub fn close(&mut self) {
self.close_preserve_ledgers();
for path in &self.ledger_paths {
remove_dir_all(path).unwrap();
}
@ -204,7 +200,7 @@ impl LocalCluster {
impl Drop for LocalCluster {
fn drop(&mut self) {
self.close()
self.close();
}
}
@ -224,7 +220,7 @@ mod test {
solana_logger::setup();
let mut fullnode_exit = FullnodeConfig::default();
fullnode_exit.rpc_config.enable_fullnode_exit = true;
let cluster = LocalCluster::new_with_config(1, 100, 3, &fullnode_exit);
let cluster = LocalCluster::new_with_config(&[3], 100, &fullnode_exit);
drop(cluster)
}
}

View File

@ -90,6 +90,7 @@ impl ReplayStage {
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);
let mut votable: Vec<u64> = vec![];
let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
if bank.collector_id() != my_id {
@ -141,9 +142,13 @@ impl ReplayStage {
.lock()
.unwrap()
.reset(parent.tick_height(), parent.last_blockhash());
is_tpu_bank_active = false;
}
if !is_tpu_bank_active {
Self::start_leader(my_id, &bank_forks, &poh_recorder, &cluster_info);
}
Self::start_leader(my_id, &bank_forks, &poh_recorder, &cluster_info);
inc_new_counter_info!(
"replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize
@ -182,30 +187,36 @@ impl ReplayStage {
assert!(frozen.get(&poh_slot).is_none());
trace!("checking poh slot for leader {}", poh_slot);
if bank_forks.read().unwrap().get(poh_slot).is_none() {
let next_leader = leader_schedule_utils::slot_leader_at(poh_slot, parent);
debug!(
"me: {} leader {} at poh slot {}",
my_id, next_leader, poh_slot
);
cluster_info.write().unwrap().set_leader(next_leader);
if next_leader == my_id {
debug!("starting tpu for slot {}", poh_slot);
let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot);
bank_forks.write().unwrap().insert(poh_slot, tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
leader_schedule_utils::slot_leader_at(poh_slot, parent)
.map(|next_leader| {
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader
"me: {} leader {} at poh slot {}",
my_id, next_leader, poh_slot
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
cluster_info.write().unwrap().set_leader(next_leader);
if next_leader == my_id {
debug!("starting tpu for slot {}", poh_slot);
let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot);
bank_forks.write().unwrap().insert(poh_slot, tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
})
.or_else(|| {
error!("No next leader found");
None
});
}
} else {
error!("No frozen banks available!");
@ -308,7 +319,7 @@ impl ReplayStage {
trace!("child already active {}", child_id);
continue;
}
let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank);
let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank).unwrap();
info!("new fork:{} parent:{}", child_id, parent_id);
forks.insert(
child_id,

View File

@ -3,6 +3,10 @@ extern crate solana;
use solana::cluster_tests;
use solana::fullnode::FullnodeConfig;
use solana::local_cluster::LocalCluster;
use solana::poh_service::PohServiceConfig;
use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT};
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_spend_and_verify_all_nodes_1() {
@ -55,7 +59,7 @@ fn test_fullnode_exit_2() {
let num_nodes = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_config);
let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
@ -64,7 +68,7 @@ fn test_leader_failure_2() {
let num_nodes = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_config);
let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
@ -77,10 +81,31 @@ fn test_leader_failure_3() {
let num_nodes = 3;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_config);
let local = LocalCluster::new_with_config(&[100; 3], 10_000, &fullnode_config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
fn test_two_unbalanced_stakes() {
let mut fullnode_config = FullnodeConfig::default();
let num_ticks_per_second = 100;
fullnode_config.tick_config =
PohServiceConfig::Sleep(Duration::from_millis(100 / num_ticks_per_second));
fullnode_config.rpc_config.enable_fullnode_exit = true;
let mut cluster = LocalCluster::new_with_config(&[999_990, 3], 1_000_000, &fullnode_config);
let num_epochs_to_sleep = 10;
let num_ticks_to_sleep = num_epochs_to_sleep * DEFAULT_TICKS_PER_SLOT * DEFAULT_SLOTS_PER_EPOCH;
sleep(Duration::from_millis(
num_ticks_to_sleep / num_ticks_per_second * 100,
));
cluster.close_preserve_ledgers();
let leader_ledger = cluster.ledger_paths[1].clone();
cluster_tests::verify_ledger_ticks(&leader_ledger, DEFAULT_TICKS_PER_SLOT as usize);
drop(cluster);
}