diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 6160181662..fd8b89fdff 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -204,7 +204,7 @@ pub fn process_blocktree( if next_meta.is_full() { let next_bank = Arc::new(Bank::new_from_parent( &bank, - leader_schedule_utils::slot_leader_at(next_slot, &bank), + leader_schedule_utils::slot_leader_at(next_slot, &bank).unwrap(), next_slot, )); trace!("Add child bank for slot={}", next_slot); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 41996262a1..6463123921 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -43,16 +43,8 @@ impl Broadcast { blocktree: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); - let (bank, entries) = receiver.recv_timeout(timer)?; - let mut broadcast_table = cluster_info - .read() - .unwrap() - .sorted_tvu_peers(&staking_utils::delegated_stakes(&bank)); - // Layer 1, leader nodes are limited to the fanout size. - broadcast_table.truncate(DATA_PLANE_FANOUT); - inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); - - let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; + let (mut bank, entries) = receiver.recv_timeout(timer)?; + let mut max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; let now = Instant::now(); let mut num_entries = entries.len(); @@ -60,17 +52,35 @@ impl Broadcast { let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); ventries.push(entries); - while let Ok((same_bank, entries)) = receiver.try_recv() { - num_entries += entries.len(); - last_tick = entries.last().map(|v| v.1).unwrap_or(0); - ventries.push(entries); - assert!(last_tick <= max_tick_height); - assert!(same_bank.slot() == bank.slot()); - if last_tick == max_tick_height { - break; + assert!(last_tick <= max_tick_height,); + if last_tick != max_tick_height { + while let Ok((same_bank, entries)) = receiver.try_recv() { + // If the bank changed, that implies the previous slot was interrupted and we do not have to + // broadcast its entries. + if same_bank.slot() != bank.slot() { + num_entries = 0; + ventries.clear(); + bank = same_bank.clone(); + max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; + } + num_entries += entries.len(); + last_tick = entries.last().map(|v| v.1).unwrap_or(0); + ventries.push(entries); + assert!(last_tick <= max_tick_height,); + if last_tick == max_tick_height { + break; + } } } + let mut broadcast_table = cluster_info + .read() + .unwrap() + .sorted_tvu_peers(&staking_utils::delegated_stakes(&bank)); + // Layer 1, leader nodes are limited to the fanout size. + broadcast_table.truncate(DATA_PLANE_FANOUT); + + inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); inc_new_counter_info!("broadcast_service-entries_received", num_entries); let to_blobs_start = Instant::now(); diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index 2ec8ef95b2..6292ffa60e 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -1,10 +1,13 @@ +use crate::blocktree::Blocktree; /// Cluster independant integration tests /// /// All tests must start from an entry point and a funding keypair and /// discover the rest of the network. use crate::client::mk_client; use crate::contact_info::ContactInfo; +use crate::entry::{Entry, EntrySlice}; use crate::gossip_service::discover; +use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND}; @@ -59,6 +62,39 @@ pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { } } +pub fn verify_ledger_ticks(ledger_path: &str, ticks_per_slot: usize) { + let ledger = Blocktree::open(ledger_path).unwrap(); + let zeroth_slot = ledger.get_slot_entries(0, 0, None).unwrap(); + let last_id = zeroth_slot.last().unwrap().hash; + let next_slots = ledger.get_slots_since(&[0]).unwrap().remove(&0).unwrap(); + let mut pending_slots: Vec<_> = next_slots + .into_iter() + .map(|slot| (slot, 0, last_id)) + .collect(); + while !pending_slots.is_empty() { + let (slot, parent_slot, last_id) = pending_slots.pop().unwrap(); + let next_slots = ledger + .get_slots_since(&[slot]) + .unwrap() + .remove(&slot) + .unwrap(); + + // If you're not the last slot, you should have a full set of ticks + let should_verify_ticks = if !next_slots.is_empty() { + Some((slot - parent_slot) as usize * ticks_per_slot) + } else { + None + }; + + let last_id = verify_slot_ticks(&ledger, slot, &last_id, should_verify_ticks); + pending_slots.extend( + next_slots + .into_iter() + .map(|child_slot| (child_slot, slot, last_id)), + ); + } +} + pub fn kill_entry_and_spend_and_verify_rest( entry_point_info: &ContactInfo, funding_keypair: &Keypair, @@ -105,3 +141,23 @@ pub fn kill_entry_and_spend_and_verify_rest( } } } + +fn get_and_verify_slot_entries(blocktree: &Blocktree, slot: u64, last_entry: &Hash) -> Vec { + let entries = blocktree.get_slot_entries(slot, 0, None).unwrap(); + assert!(entries.verify(last_entry)); + entries +} + +fn verify_slot_ticks( + blocktree: &Blocktree, + slot: u64, + last_entry: &Hash, + expected_num_ticks: Option, +) -> Hash { + let entries = get_and_verify_slot_entries(blocktree, slot, last_entry); + let num_ticks: usize = entries.iter().map(|entry| entry.is_tick() as usize).sum(); + if let Some(expected_num_ticks) = expected_num_ticks { + assert_eq!(num_ticks, expected_num_ticks); + } + entries.last().unwrap().hash +} diff --git a/core/src/leader_schedule_utils.rs b/core/src/leader_schedule_utils.rs index 7cf68b9b70..acbb670357 100644 --- a/core/src/leader_schedule_utils.rs +++ b/core/src/leader_schedule_utils.rs @@ -4,14 +4,14 @@ use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; /// Return the leader schedule for the given epoch. -fn leader_schedule(epoch_height: u64, bank: &Bank) -> LeaderSchedule { - let stakes = staking_utils::delegated_stakes_at_epoch(bank, epoch_height) - .expect("epoch state must exist"); - let mut seed = [0u8; 32]; - seed[0..8].copy_from_slice(&epoch_height.to_le_bytes()); - let mut stakes: Vec<_> = stakes.into_iter().collect(); - sort_stakes(&mut stakes); - LeaderSchedule::new(&stakes, seed, bank.get_slots_in_epoch(epoch_height)) +fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option { + staking_utils::delegated_stakes_at_epoch(bank, epoch_height).map(|stakes| { + let mut seed = [0u8; 32]; + seed[0..8].copy_from_slice(&epoch_height.to_le_bytes()); + let mut stakes: Vec<_> = stakes.into_iter().collect(); + sort_stakes(&mut stakes); + LeaderSchedule::new(&stakes, seed, bank.get_slots_in_epoch(epoch_height)) + }) } fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { @@ -31,11 +31,10 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { } /// Return the leader for the given slot. -pub fn slot_leader_at(slot: u64, bank: &Bank) -> Pubkey { +pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option { let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot); - let leader_schedule = leader_schedule(epoch, bank); - leader_schedule[slot_index as usize] + leader_schedule(epoch, bank).map(|leader_schedule| leader_schedule[slot_index as usize]) } // Returns the number of ticks remaining from the specified tick_height to the end of the @@ -85,7 +84,7 @@ mod tests { ) .0; let bank = Bank::new(&genesis_block); - assert_eq!(slot_leader_at(bank.slot(), &bank), pubkey); + assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey); } #[test] diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index 9e6c9cb310..767c841aac 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -21,33 +21,26 @@ pub struct LocalCluster { pub funding_keypair: Keypair, /// Entry point from which the rest of the network can be discovered pub entry_point_info: NodeInfo, + pub ledger_paths: Vec, fullnodes: Vec, - ledger_paths: Vec, } impl LocalCluster { pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self { - Self::new_with_config( - num_nodes, - cluster_lamports, - lamports_per_node, - &FullnodeConfig::default(), - ) + let stakes: Vec<_> = (0..num_nodes).map(|_| lamports_per_node).collect(); + Self::new_with_config(&stakes, cluster_lamports, &FullnodeConfig::default()) } pub fn new_with_config( - num_nodes: usize, + node_stakes: &[u64], cluster_lamports: u64, - lamports_per_node: u64, fullnode_config: &FullnodeConfig, ) -> Self { - // Must have enough tokens to fund vote account and set delegate - assert!(lamports_per_node > 2); let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let (genesis_block, mint_keypair) = - GenesisBlock::new_with_leader(cluster_lamports, leader_pubkey, lamports_per_node); + GenesisBlock::new_with_leader(cluster_lamports, leader_pubkey, node_stakes[0]); let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); let mut ledger_paths = vec![]; @@ -65,7 +58,9 @@ impl LocalCluster { ); let mut fullnodes = vec![leader_server]; let mut client = mk_client(&leader_node_info); - for _ in 0..(num_nodes - 1) { + for stake in &node_stakes[1..] { + // Must have enough tokens to fund vote account and set delegate + assert!(*stake > 2); let validator_keypair = Arc::new(Keypair::new()); let voting_keypair = Keypair::new(); let validator_pubkey = validator_keypair.pubkey(); @@ -74,12 +69,8 @@ impl LocalCluster { ledger_paths.push(ledger_path.clone()); // Send each validator some lamports to vote - let validator_balance = Self::transfer( - &mut client, - &mint_keypair, - &validator_pubkey, - lamports_per_node, - ); + let validator_balance = + Self::transfer(&mut client, &mint_keypair, &validator_pubkey, *stake); info!( "validator {} balance {}", validator_pubkey, validator_balance @@ -89,7 +80,7 @@ impl LocalCluster { &mut client, &voting_keypair, &validator_keypair, - lamports_per_node - 1, + stake - 1, ) .unwrap(); let validator_server = Fullnode::new( @@ -102,7 +93,7 @@ impl LocalCluster { ); fullnodes.push(validator_server); } - discover(&leader_node_info, num_nodes).unwrap(); + discover(&leader_node_info, node_stakes.len()).unwrap(); Self { funding_keypair: mint_keypair, entry_point_info: leader_node_info, @@ -116,11 +107,16 @@ impl LocalCluster { node.exit(); } } - pub fn close(&mut self) { + + pub fn close_preserve_ledgers(&mut self) { self.exit(); while let Some(node) = self.fullnodes.pop() { node.join().unwrap(); } + } + + pub fn close(&mut self) { + self.close_preserve_ledgers(); for path in &self.ledger_paths { remove_dir_all(path).unwrap(); } @@ -204,7 +200,7 @@ impl LocalCluster { impl Drop for LocalCluster { fn drop(&mut self) { - self.close() + self.close(); } } @@ -224,7 +220,7 @@ mod test { solana_logger::setup(); let mut fullnode_exit = FullnodeConfig::default(); fullnode_exit.rpc_config.enable_fullnode_exit = true; - let cluster = LocalCluster::new_with_config(1, 100, 3, &fullnode_exit); + let cluster = LocalCluster::new_with_config(&[3], 100, &fullnode_exit); drop(cluster) } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index abf82f662e..1d52d39c78 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -90,6 +90,7 @@ impl ReplayStage { let active_banks = bank_forks.read().unwrap().active_banks(); trace!("active banks {:?}", active_banks); let mut votable: Vec = vec![]; + let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some(); for bank_slot in &active_banks { let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); if bank.collector_id() != my_id { @@ -141,9 +142,13 @@ impl ReplayStage { .lock() .unwrap() .reset(parent.tick_height(), parent.last_blockhash()); + is_tpu_bank_active = false; + } + + if !is_tpu_bank_active { + Self::start_leader(my_id, &bank_forks, &poh_recorder, &cluster_info); } - Self::start_leader(my_id, &bank_forks, &poh_recorder, &cluster_info); inc_new_counter_info!( "replicate_stage-duration", duration_as_ms(&now.elapsed()) as usize @@ -182,30 +187,36 @@ impl ReplayStage { assert!(frozen.get(&poh_slot).is_none()); trace!("checking poh slot for leader {}", poh_slot); if bank_forks.read().unwrap().get(poh_slot).is_none() { - let next_leader = leader_schedule_utils::slot_leader_at(poh_slot, parent); - debug!( - "me: {} leader {} at poh slot {}", - my_id, next_leader, poh_slot - ); - cluster_info.write().unwrap().set_leader(next_leader); - if next_leader == my_id { - debug!("starting tpu for slot {}", poh_slot); - let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); - bank_forks.write().unwrap().insert(poh_slot, tpu_bank); - if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { - assert_eq!( - bank_forks.read().unwrap().working_bank().slot(), - tpu_bank.slot() - ); + leader_schedule_utils::slot_leader_at(poh_slot, parent) + .map(|next_leader| { debug!( - "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", - my_id, - tpu_bank.slot(), - next_leader + "me: {} leader {} at poh slot {}", + my_id, next_leader, poh_slot ); - poh_recorder.lock().unwrap().set_bank(&tpu_bank); - } - } + cluster_info.write().unwrap().set_leader(next_leader); + if next_leader == my_id { + debug!("starting tpu for slot {}", poh_slot); + let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot); + bank_forks.write().unwrap().insert(poh_slot, tpu_bank); + if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() { + assert_eq!( + bank_forks.read().unwrap().working_bank().slot(), + tpu_bank.slot() + ); + debug!( + "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", + my_id, + tpu_bank.slot(), + next_leader + ); + poh_recorder.lock().unwrap().set_bank(&tpu_bank); + } + } + }) + .or_else(|| { + error!("No next leader found"); + None + }); } } else { error!("No frozen banks available!"); @@ -308,7 +319,7 @@ impl ReplayStage { trace!("child already active {}", child_id); continue; } - let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank); + let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank).unwrap(); info!("new fork:{} parent:{}", child_id, parent_id); forks.insert( child_id, diff --git a/tests/local_cluster.rs b/tests/local_cluster.rs index 807021a873..842dc6dd00 100644 --- a/tests/local_cluster.rs +++ b/tests/local_cluster.rs @@ -3,6 +3,10 @@ extern crate solana; use solana::cluster_tests; use solana::fullnode::FullnodeConfig; use solana::local_cluster::LocalCluster; +use solana::poh_service::PohServiceConfig; +use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}; +use std::thread::sleep; +use std::time::Duration; #[test] fn test_spend_and_verify_all_nodes_1() { @@ -55,7 +59,7 @@ fn test_fullnode_exit_2() { let num_nodes = 2; let mut fullnode_config = FullnodeConfig::default(); fullnode_config.rpc_config.enable_fullnode_exit = true; - let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_config); + let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config); cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes); } @@ -64,7 +68,7 @@ fn test_leader_failure_2() { let num_nodes = 2; let mut fullnode_config = FullnodeConfig::default(); fullnode_config.rpc_config.enable_fullnode_exit = true; - let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_config); + let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config); cluster_tests::kill_entry_and_spend_and_verify_rest( &local.entry_point_info, &local.funding_keypair, @@ -77,10 +81,31 @@ fn test_leader_failure_3() { let num_nodes = 3; let mut fullnode_config = FullnodeConfig::default(); fullnode_config.rpc_config.enable_fullnode_exit = true; - let local = LocalCluster::new_with_config(num_nodes, 10_000, 100, &fullnode_config); + let local = LocalCluster::new_with_config(&[100; 3], 10_000, &fullnode_config); cluster_tests::kill_entry_and_spend_and_verify_rest( &local.entry_point_info, &local.funding_keypair, num_nodes, ); } + +#[test] +fn test_two_unbalanced_stakes() { + let mut fullnode_config = FullnodeConfig::default(); + let num_ticks_per_second = 100; + fullnode_config.tick_config = + PohServiceConfig::Sleep(Duration::from_millis(100 / num_ticks_per_second)); + fullnode_config.rpc_config.enable_fullnode_exit = true; + let mut cluster = LocalCluster::new_with_config(&[999_990, 3], 1_000_000, &fullnode_config); + let num_epochs_to_sleep = 10; + let num_ticks_to_sleep = num_epochs_to_sleep * DEFAULT_TICKS_PER_SLOT * DEFAULT_SLOTS_PER_EPOCH; + sleep(Duration::from_millis( + num_ticks_to_sleep / num_ticks_per_second * 100, + )); + + cluster.close_preserve_ledgers(); + let leader_ledger = cluster.ledger_paths[1].clone(); + cluster_tests::verify_ledger_ticks(&leader_ledger, DEFAULT_TICKS_PER_SLOT as usize); + + drop(cluster); +}