Local Cluster Duplicate Switch Test (#32614)

* Add test for broken behavior in same batch

* tests

* redo test

* Important fixes to not immediately duplicate confirm by adding extra node

* Fixup merge

* PR comments

* Redo stakes

* clippy

* fixes

* Resolve conflicts

* add thread logging

* Fixup merge

* Fixup bugs

* Revert "add thread logging"

This reverts commit 9dc22401054b8f91f2b2aa3033e482996913febb.

* Hide scope

* Fixes

* Cleanup test_faulty_node

* More fixes

* Fixes

* Error logging

* Fix duplicate confirmed

* done

* PR comments

* Revert "Error logging"

This reverts commit 18953c36a5e865ecdd38bbf49b8d0502448087d2.

* PR comments

* nit
This commit is contained in:
carllin 2023-08-08 19:29:39 -04:00 committed by GitHub
parent 8e4a9a94ed
commit d5faa6e8aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 641 additions and 64 deletions

View File

@ -3220,6 +3220,10 @@ impl Blockstore {
self.dead_slots_cf.delete(slot)
}
pub fn remove_slot_duplicate_proof(&self, slot: Slot) -> Result<()> {
self.duplicate_slots_cf.delete(slot)
}
pub fn store_duplicate_if_not_existing(
&self,
slot: Slot,
@ -3233,6 +3237,15 @@ impl Blockstore {
}
}
pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> {
let mut iter = self
.db
.iter::<cf::DuplicateSlots>(IteratorMode::From(0, IteratorDirection::Forward))
.unwrap();
iter.next()
.map(|(slot, proof_bytes)| (slot, deserialize(&proof_bytes).unwrap()))
}
pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)

View File

@ -2,6 +2,7 @@ use {
solana_client::thin_client::ThinClient,
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::socket::SocketAddrSpace,
std::{path::PathBuf, sync::Arc},
@ -62,4 +63,6 @@ pub trait Cluster {
config: ValidatorConfig,
socket_addr_space: SocketAddrSpace,
);
fn set_entry_point(&mut self, entry_point_info: ContactInfo);
fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey);
}

View File

@ -13,10 +13,10 @@ use {
},
solana_gossip::{
cluster_info::Node,
contact_info::{ContactInfo, LegacyContactInfo},
contact_info::{ContactInfo, LegacyContactInfo, Protocol},
gossip_service::discover_cluster,
},
solana_ledger::create_new_tmp_ledger,
solana_ledger::{create_new_tmp_ledger, shred::Shred},
solana_runtime::{
genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
@ -57,6 +57,7 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
net::UdpSocket,
path::{Path, PathBuf},
sync::{Arc, RwLock},
},
@ -852,6 +853,10 @@ impl Cluster for LocalCluster {
(node, entry_point_info)
}
fn set_entry_point(&mut self, entry_point_info: ContactInfo) {
self.entry_point_info = entry_point_info;
}
fn restart_node(
&mut self,
pubkey: &Pubkey,
@ -922,6 +927,20 @@ impl Cluster for LocalCluster {
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo> {
self.validators.get(pubkey).map(|v| &v.info.contact_info)
}
fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey) {
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let validator_tvu = self
.get_contact_info(validator_key)
.unwrap()
.tvu(Protocol::UDP)
.unwrap();
for shred in dup_shreds {
send_socket
.send_to(shred.payload().as_ref(), validator_tvu)
.unwrap();
}
}
}
impl Drop for LocalCluster {

View File

@ -77,6 +77,14 @@ pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Tower> {
Tower::restore(&file_tower_storage, node_pubkey).ok()
}
pub fn remove_tower_if_exists(tower_path: &Path, node_pubkey: &Pubkey) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
let filename = file_tower_storage.filename(node_pubkey);
if filename.exists() {
fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap();
}
}
pub fn remove_tower(tower_path: &Path, node_pubkey: &Pubkey) {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());
fs::remove_file(file_tower_storage.filename(node_pubkey)).unwrap();
@ -120,17 +128,18 @@ pub fn purge_slots_with_count(blockstore: &Blockstore, start_slot: Slot, slot_co
pub fn wait_for_last_vote_in_tower_to_land_in_ledger(
ledger_path: &Path,
node_pubkey: &Pubkey,
) -> Slot {
let (last_vote, _) = last_vote_in_tower(ledger_path, node_pubkey).unwrap();
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
) -> Option<Slot> {
last_vote_in_tower(ledger_path, node_pubkey).map(|(last_vote, _)| {
loop {
// We reopen in a loop to make sure we get updates
let blockstore = open_blockstore(ledger_path);
if blockstore.is_full(last_vote) {
break;
}
sleep(Duration::from_millis(100));
}
sleep(Duration::from_millis(100));
}
last_vote
last_vote
})
}
pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
@ -390,40 +399,66 @@ pub fn run_cluster_partition<C>(
on_partition_resolved(&mut cluster, &mut context);
}
pub struct ValidatorTestConfig {
pub validator_keypair: Arc<Keypair>,
pub validator_config: ValidatorConfig,
pub in_genesis: bool,
}
pub fn test_faulty_node(
faulty_node_type: BroadcastStageType,
node_stakes: Vec<u64>,
validator_test_configs: Option<Vec<ValidatorTestConfig>>,
custom_leader_schedule: Option<FixedSchedule>,
) -> (LocalCluster, Vec<Arc<Keypair>>) {
solana_logger::setup_with_default("solana_local_cluster=info");
let num_nodes = node_stakes.len();
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
let validator_keys = validator_test_configs
.as_ref()
.map(|configs| {
configs
.iter()
.map(|config| (config.validator_keypair.clone(), config.in_genesis))
.collect()
})
.unwrap_or_else(|| {
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
validator_keys
});
assert_eq!(node_stakes.len(), num_nodes);
assert_eq!(validator_keys.len(), num_nodes);
// Use a fixed leader schedule so that only the faulty node gets leader slots.
let validator_to_slots = vec![(
validator_keys[0].0.as_ref().pubkey(),
solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize,
)];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
let fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
let fixed_leader_schedule = custom_leader_schedule.unwrap_or_else(|| {
// Use a fixed leader schedule so that only the faulty node gets leader slots.
let validator_to_slots = vec![(
validator_keys[0].0.as_ref().pubkey(),
solana_sdk::clock::DEFAULT_DEV_SLOTS_PER_EPOCH as usize,
)];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
}
});
let error_validator_config = ValidatorConfig {
broadcast_stage_type: faulty_node_type,
fixed_leader_schedule: fixed_leader_schedule.clone(),
..ValidatorConfig::default_for_test()
};
let mut validator_configs = Vec::with_capacity(num_nodes);
let mut validator_configs = validator_test_configs
.map(|configs| {
configs
.into_iter()
.map(|config| config.validator_config)
.collect()
})
.unwrap_or_else(|| {
let mut configs = Vec::with_capacity(num_nodes);
configs.resize_with(num_nodes, ValidatorConfig::default_for_test);
configs
});
// First validator is the bootstrap leader with the malicious broadcast logic.
validator_configs.push(error_validator_config);
validator_configs.resize_with(num_nodes, || ValidatorConfig {
fixed_leader_schedule: fixed_leader_schedule.clone(),
..ValidatorConfig::default_for_test()
});
validator_configs[0].broadcast_stage_type = faulty_node_type;
for config in &mut validator_configs {
config.fixed_leader_schedule = Some(fixed_leader_schedule.clone());
}
let mut cluster_config = ClusterConfig {
cluster_lamports: 10_000,

View File

@ -22,8 +22,10 @@ use {
ancestor_iterator::AncestorIterator,
bank_forks_utils,
blockstore::{entries_to_test_shreds, Blockstore},
blockstore_meta::DuplicateSlotProof,
blockstore_processor::ProcessOptions,
leader_schedule::FixedSchedule,
shred::Shred,
use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
},
solana_local_cluster::{
@ -68,7 +70,8 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::broadcast_stage::{
broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType,
broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition},
BroadcastStageType,
},
solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction},
std::{
@ -1503,12 +1506,15 @@ fn test_snapshots_restart_validity() {
#[allow(unused_attributes)]
#[ignore]
fn test_fail_entry_verification_leader() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let leader_stake = (DUPLICATE_THRESHOLD * 100.0) as u64 + 1;
let validator_stake1 = (100 - leader_stake) / 2;
let validator_stake2 = 100 - leader_stake - validator_stake1;
let (cluster, _) = test_faulty_node(
BroadcastStageType::FailEntryVerification,
vec![leader_stake, validator_stake1, validator_stake2],
None,
None,
);
cluster.check_for_new_roots(
16,
@ -1522,8 +1528,14 @@ fn test_fail_entry_verification_leader() {
#[ignore]
#[allow(unused_attributes)]
fn test_fake_shreds_broadcast_leader() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let node_stakes = vec![300, 100];
let (cluster, _) = test_faulty_node(BroadcastStageType::BroadcastFakeShreds, node_stakes);
let (cluster, _) = test_faulty_node(
BroadcastStageType::BroadcastFakeShreds,
node_stakes,
None,
None,
);
cluster.check_for_new_roots(
16,
"test_fake_shreds_broadcast_leader",
@ -3255,7 +3267,8 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
{
// Find latest vote in B, and wait for it to reach blockstore
let b_last_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey);
wait_for_last_vote_in_tower_to_land_in_ledger(&val_b_ledger_path, &validator_b_pubkey)
.unwrap();
// Now we copy these blocks to A
let b_blockstore = open_blockstore(&val_b_ledger_path);
@ -3465,11 +3478,13 @@ fn test_fork_choice_refresh_old_votes() {
let lighter_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger(
&lighter_fork_ledger_path,
&context.lighter_fork_validator_key,
);
)
.unwrap();
let heaviest_fork_latest_vote = wait_for_last_vote_in_tower_to_land_in_ledger(
&heaviest_ledger_path,
&context.heaviest_validator_key,
);
)
.unwrap();
// Open ledgers
let smallest_blockstore = open_blockstore(&smallest_ledger_path);
@ -3758,9 +3773,12 @@ fn test_duplicate_shreds_broadcast_leader() {
// 1) Set up the cluster
let (mut cluster, validator_keys) = test_faulty_node(
BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig {
stake_partition: partition_node_stake,
partition: ClusterPartition::Stake(partition_node_stake),
duplicate_slot_sender: None,
}),
node_stakes,
None,
None,
);
// This is why it's important our node was last in `node_stakes`
@ -4422,7 +4440,8 @@ fn test_slot_hash_expiry() {
let mut last_vote_on_a;
// Keep A running for a while longer so the majority fork has some decent size
loop {
last_vote_on_a = wait_for_last_vote_in_tower_to_land_in_ledger(&a_ledger_path, &a_pubkey);
last_vote_on_a =
wait_for_last_vote_in_tower_to_land_in_ledger(&a_ledger_path, &a_pubkey).unwrap();
if last_vote_on_a
>= common_ancestor_slot + 2 * (solana_sdk::slot_hashes::get_entries() as u64)
{
@ -4448,7 +4467,8 @@ fn test_slot_hash_expiry() {
info!("Allowing B to fork");
loop {
let blockstore = open_blockstore(&b_ledger_path);
let last_vote = wait_for_last_vote_in_tower_to_land_in_ledger(&b_ledger_path, &b_pubkey);
let last_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&b_ledger_path, &b_pubkey).unwrap();
let mut ancestors = AncestorIterator::new(last_vote, &blockstore);
if let Some(index) = ancestors.position(|x| x == common_ancestor_slot) {
if index > 7 {
@ -4662,7 +4682,8 @@ fn test_duplicate_with_pruned_ancestor() {
last_minority_vote
);
let last_minority_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&minority_ledger_path, &minority_pubkey);
wait_for_last_vote_in_tower_to_land_in_ledger(&minority_ledger_path, &minority_pubkey)
.unwrap();
let minority_validator_info = cluster.exit_node(&minority_pubkey);
info!("Truncating majority validator ledger to {fork_slot}");
@ -4708,7 +4729,8 @@ fn test_duplicate_with_pruned_ancestor() {
}
let last_majority_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey);
wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey)
.unwrap();
info!(
"Creating duplicate block built off of pruned branch for our node.
Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}"
@ -5015,3 +5037,463 @@ fn test_boot_from_local_state() {
info!("Checking if validator{i} has the same snapshots as validator3... DONE");
}
}
// We want to simulate the following:
// /--- 1 --- 3 (duplicate block)
// 0
// \--- 2
//
// 1. > DUPLICATE_THRESHOLD of the nodes vote on some version of the the duplicate block 3,
// but don't immediately duplicate confirm so they remove 3 from fork choice and reset PoH back to 1.
// 2. All the votes on 3 don't land because there are no further blocks building off 3.
// 3. Some < SWITCHING_THRESHOLD of nodes vote on 2, making it the heaviest fork because no votes on 3 landed
// 4. Nodes then see duplicate confirmation on 3.
// 5. Unless somebody builds off of 3 to include the duplicate confirmed votes, 2 will still be the heaviest.
// However, because 2 has < SWITCHING_THRESHOLD of the votes, people who voted on 3 can't switch, leading to a
// stall
#[test]
#[serial]
#[allow(unused_attributes)]
fn test_duplicate_shreds_switch_failure() {
fn wait_for_duplicate_fork_frozen(ledger_path: &Path, dup_slot: Slot) -> Hash {
// Ensure all the slots <= dup_slot are also full so we know we can replay up to dup_slot
// on restart
info!(
"Waiting to receive and replay entire duplicate fork with tip {}",
dup_slot
);
loop {
let duplicate_fork_validator_blockstore = open_blockstore(ledger_path);
if let Some(frozen_hash) = duplicate_fork_validator_blockstore.get_bank_hash(dup_slot) {
return frozen_hash;
}
sleep(Duration::from_millis(1000));
}
}
fn clear_ledger_and_tower(ledger_path: &Path, pubkey: &Pubkey, start_slot: Slot) {
remove_tower_if_exists(ledger_path, pubkey);
let blockstore = open_blockstore(ledger_path);
purge_slots_with_count(&blockstore, start_slot, 1000);
{
// Remove all duplicate proofs so that this dup_slot will vote on the `dup_slot`.
while let Some((proof_slot, _)) = blockstore.get_first_duplicate_proof() {
blockstore.remove_slot_duplicate_proof(proof_slot).unwrap();
}
}
}
fn restart_dup_validator(
cluster: &mut LocalCluster,
mut duplicate_fork_validator_info: ClusterValidatorInfo,
pubkey: &Pubkey,
dup_slot: Slot,
dup_shred1: &Shred,
dup_shred2: &Shred,
) {
let disable_turbine = Arc::new(AtomicBool::new(true));
duplicate_fork_validator_info.config.voting_disabled = false;
duplicate_fork_validator_info.config.turbine_disabled = disable_turbine.clone();
info!("Restarting node: {}", pubkey);
cluster.restart_node(
pubkey,
duplicate_fork_validator_info,
SocketAddrSpace::Unspecified,
);
let ledger_path = cluster.ledger_path(pubkey);
// Lift the partition after `pubkey` votes on the `dup_slot`
info!(
"Waiting on duplicate fork to vote on duplicate slot: {}",
dup_slot
);
loop {
let last_vote = last_vote_in_tower(&ledger_path, pubkey);
if let Some((latest_vote_slot, _hash)) = last_vote {
info!("latest vote: {}", latest_vote_slot);
if latest_vote_slot == dup_slot {
break;
}
}
sleep(Duration::from_millis(1000));
}
disable_turbine.store(false, Ordering::Relaxed);
// Send the validator the other version of the shred so they realize it's duplicate
info!("Resending duplicate shreds to duplicate fork validator");
cluster.send_shreds_to_validator(vec![dup_shred1, dup_shred2], pubkey);
// Check the validator detected a duplicate proof
info!("Waiting on duplicate fork validator to see duplicate shreds and make a proof",);
loop {
let duplicate_fork_validator_blockstore = open_blockstore(&ledger_path);
if let Some(dup_proof) = duplicate_fork_validator_blockstore.get_first_duplicate_proof()
{
assert_eq!(dup_proof.0, dup_slot);
break;
}
sleep(Duration::from_millis(1000));
}
}
fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option<DuplicateSlotProof> {
for _ in 0..10 {
let duplicate_fork_validator_blockstore = open_blockstore(ledger_path);
if let Some((found_dup_slot, found_duplicate_proof)) =
duplicate_fork_validator_blockstore.get_first_duplicate_proof()
{
if found_dup_slot == dup_slot {
return Some(found_duplicate_proof);
};
}
sleep(Duration::from_millis(1000));
}
None
}
solana_logger::setup_with_default(RUST_LOG_FILTER);
let validator_keypairs = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
"4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye",
"2XFPyuzPuXMsPnkH98UNcQpfA7M4b2TUhRxcWEoWjy4M6ojQ7HGJSvotktEVbaq49Qxt16wUjdqvSJc6ecbFfZwj",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.collect::<Vec<_>>();
let validators = validator_keypairs
.iter()
.map(|(kp, _)| kp.pubkey())
.collect::<Vec<_>>();
// Create 4 nodes:
// 1) Two nodes that sum to > DUPLICATE_THRESHOLD but < 2/3+ supermajority. It's important both
// of them individually have <= DUPLICATE_THRESHOLD to avoid duplicate confirming their own blocks
// immediately upon voting
// 2) One with <= SWITCHING_THRESHOLD so that validator from 1) can't switch to it
// 3) One bad leader to make duplicate slots
let total_stake = 100 * DEFAULT_NODE_STAKE;
let target_switch_fork_stake = (total_stake as f64 * SWITCH_FORK_THRESHOLD) as u64;
// duplicate_fork_node1_stake + duplicate_fork_node2_stake > DUPLICATE_THRESHOLD. Don't want
// one node with > DUPLICATE_THRESHOLD, otherwise they will automatically duplicate confirm a
// slot when they vote, which will prevent them from resetting to an earlier ancestor when they
// later discover that slot as duplicate.
let duplicate_fork_node1_stake = (total_stake as f64 * DUPLICATE_THRESHOLD) as u64;
let duplicate_fork_node2_stake = 1;
let duplicate_leader_stake = total_stake
- target_switch_fork_stake
- duplicate_fork_node1_stake
- duplicate_fork_node2_stake;
assert!(
duplicate_fork_node1_stake + duplicate_fork_node2_stake
> (total_stake as f64 * DUPLICATE_THRESHOLD) as u64
);
assert!(duplicate_fork_node1_stake <= (total_stake as f64 * DUPLICATE_THRESHOLD) as u64);
assert!(duplicate_fork_node2_stake <= (total_stake as f64 * DUPLICATE_THRESHOLD) as u64);
let node_stakes = vec![
duplicate_leader_stake,
target_switch_fork_stake,
duplicate_fork_node1_stake,
duplicate_fork_node2_stake,
];
let (
// Has to be first in order to be picked as the duplicate leader
duplicate_leader_validator_pubkey,
target_switch_fork_validator_pubkey,
duplicate_fork_validator1_pubkey,
duplicate_fork_validator2_pubkey,
) = (validators[0], validators[1], validators[2], validators[3]);
info!(
"duplicate_fork_validator1_pubkey: {},
duplicate_fork_validator2_pubkey: {},
target_switch_fork_validator_pubkey: {},
duplicate_leader_validator_pubkey: {}",
duplicate_fork_validator1_pubkey,
duplicate_fork_validator2_pubkey,
target_switch_fork_validator_pubkey,
duplicate_leader_validator_pubkey
);
let validator_to_slots = vec![
(duplicate_leader_validator_pubkey, 50),
(target_switch_fork_validator_pubkey, 5),
// The ideal sequence of events for the `duplicate_fork_validator1_pubkey` validator would go:
// 1. Vote for duplicate block `D`
// 2. See `D` is duplicate, remove from fork choice and reset to ancestor `A`, potentially generating a fork off that ancestor
// 3. See `D` is duplicate confirmed, but because of the bug fixed by https://github.com/solana-labs/solana/pull/28172
// where we disallow resetting to a slot which matches the last vote slot, we still don't build off `D`,
// and continue building on `A`.
//
// The `target_switch_fork_validator_pubkey` fork is necessary in 2. to force the validator stall trying to switch
// vote on that other fork and prevent the validator from making a freebie vote from `A` and allowing consensus to continue.
// It's important we don't give the `duplicate_fork_validator1_pubkey` leader slots until a certain number
// of slots have elapsed to ensure:
// 1. We have ample time to ensure he doesn't have a chance to make a block until after 2 when they see the block is duplicate.
// Otherwise, they'll build the block on top of the duplicate block, which will possibly include a vote for the duplicate block.
// We want to avoid this because this will make fork choice pick the duplicate block.
// 2. Ensure the `duplicate_fork_validator1_pubkey` sees the target switch fork before it can make another vote
// on any forks he himself generates from A. Otherwise, he will make a freebie vote on his own fork from `A` and
// consensus will continue on that fork.
// Give the duplicate fork validator plenty of leader slots after the initial delay to prevent
// 1. Switch fork from getting locked out for too long
// 2. A lot of consecutive slots in which to build up lockout in tower and make new roots
// to resolve the partition
(duplicate_fork_validator1_pubkey, 500),
];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
// 1) Set up the cluster
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
let validator_configs = validator_keypairs
.into_iter()
.map(|(validator_keypair, in_genesis)| {
let pubkey = validator_keypair.pubkey();
// Only allow the leader to vote so that no version gets duplicate confirmed.
// This is to avoid the leader dumping his own block.
let voting_disabled = { pubkey != duplicate_leader_validator_pubkey };
ValidatorTestConfig {
validator_keypair,
validator_config: ValidatorConfig {
voting_disabled,
..ValidatorConfig::default()
},
in_genesis,
}
})
.collect();
let (mut cluster, _validator_keypairs) = test_faulty_node(
BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig {
partition: ClusterPartition::Pubkey(vec![
// Don't include the other dup validator here, otherwise
// this dup version will have enough to be duplicate confirmed and
// will cause the dup leader to try and dump its own slot,
// crashing before it can signal the duplicate slot via the
// `duplicate_slot_receiver` below
duplicate_fork_validator1_pubkey,
]),
duplicate_slot_sender: Some(duplicate_slot_sender),
}),
node_stakes,
Some(validator_configs),
Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
}),
);
// Kill two validators that might duplicate confirm the duplicate block
info!("Killing unnecessary validators");
let duplicate_fork_validator2_ledger_path =
cluster.ledger_path(&duplicate_fork_validator2_pubkey);
let duplicate_fork_validator2_info = cluster.exit_node(&duplicate_fork_validator2_pubkey);
let target_switch_fork_validator_ledger_path =
cluster.ledger_path(&target_switch_fork_validator_pubkey);
let mut target_switch_fork_validator_info =
cluster.exit_node(&target_switch_fork_validator_pubkey);
// 2) Wait for a duplicate slot to land on both validators and for the target switch
// fork validator to get another version of the slot. Also ensure all versions of
// the block are playable
let dup_slot;
loop {
dup_slot = duplicate_slot_receiver
.recv_timeout(Duration::from_millis(30_000))
.expect("Duplicate leader failed to make a duplicate slot in allotted time");
// Make sure both validators received and replay the complete blocks
let dup_frozen_hash = wait_for_duplicate_fork_frozen(
&cluster.ledger_path(&duplicate_fork_validator1_pubkey),
dup_slot,
);
let original_frozen_hash = wait_for_duplicate_fork_frozen(
&cluster.ledger_path(&duplicate_leader_validator_pubkey),
dup_slot,
);
if original_frozen_hash != dup_frozen_hash {
break;
} else {
panic!(
"Duplicate leader and partition target got same hash: {}",
original_frozen_hash
);
}
}
// 3) Force `duplicate_fork_validator1_pubkey` to see a duplicate proof
info!("Waiting for duplicate proof for slot: {}", dup_slot);
let duplicate_proof = {
// Grab the other version of the slot from the `duplicate_leader_validator_pubkey`
// which we confirmed to have a different version of the frozen hash in the loop
// above
let ledger_path = cluster.ledger_path(&duplicate_leader_validator_pubkey);
let blockstore = open_blockstore(&ledger_path);
let dup_shred = blockstore
.get_data_shreds_for_slot(dup_slot, 0)
.unwrap()
.pop()
.unwrap();
info!(
"Sending duplicate shred: {:?} to {:?}",
dup_shred.signature(),
duplicate_fork_validator1_pubkey
);
cluster.send_shreds_to_validator(vec![&dup_shred], &duplicate_fork_validator1_pubkey);
wait_for_duplicate_proof(
&cluster.ledger_path(&duplicate_fork_validator1_pubkey),
dup_slot,
)
.unwrap_or_else(|| panic!("Duplicate proof for slot {} not found", dup_slot))
};
// 3) Kill all the validators
info!("Killing remaining validators");
let duplicate_fork_validator1_ledger_path =
cluster.ledger_path(&duplicate_fork_validator1_pubkey);
let duplicate_fork_validator1_info = cluster.exit_node(&duplicate_fork_validator1_pubkey);
let duplicate_leader_ledger_path = cluster.ledger_path(&duplicate_leader_validator_pubkey);
cluster.exit_node(&duplicate_leader_validator_pubkey);
let dup_shred1 = Shred::new_from_serialized_shred(duplicate_proof.shred1.clone()).unwrap();
let dup_shred2 = Shred::new_from_serialized_shred(duplicate_proof.shred2).unwrap();
assert_eq!(dup_shred1.slot(), dup_shred2.slot());
assert_eq!(dup_shred1.slot(), dup_slot);
// Purge everything including the `dup_slot` from the `target_switch_fork_validator_pubkey`
info!(
"Purging towers and ledgers for: {:?}",
duplicate_leader_validator_pubkey
);
Blockstore::destroy(&target_switch_fork_validator_ledger_path).unwrap();
{
let blockstore1 = open_blockstore(&duplicate_leader_ledger_path);
let blockstore2 = open_blockstore(&target_switch_fork_validator_ledger_path);
copy_blocks(dup_slot, &blockstore1, &blockstore2);
}
clear_ledger_and_tower(
&target_switch_fork_validator_ledger_path,
&target_switch_fork_validator_pubkey,
dup_slot,
);
info!(
"Purging towers and ledgers for: {:?}",
duplicate_fork_validator1_pubkey
);
clear_ledger_and_tower(
&duplicate_fork_validator1_ledger_path,
&duplicate_fork_validator1_pubkey,
dup_slot + 1,
);
info!(
"Purging towers and ledgers for: {:?}",
duplicate_fork_validator2_pubkey
);
// Copy validator 1's ledger to validator 2 so that they have the same version
// of the duplicate slot
clear_ledger_and_tower(
&duplicate_fork_validator2_ledger_path,
&duplicate_fork_validator2_pubkey,
dup_slot,
);
Blockstore::destroy(&duplicate_fork_validator2_ledger_path).unwrap();
{
let blockstore1 = open_blockstore(&duplicate_fork_validator1_ledger_path);
let blockstore2 = open_blockstore(&duplicate_fork_validator2_ledger_path);
copy_blocks(dup_slot, &blockstore1, &blockstore2);
}
// Set entrypoint to `target_switch_fork_validator_pubkey` so we can run discovery in gossip even without the
// bad leader
cluster.set_entry_point(target_switch_fork_validator_info.info.contact_info.clone());
// 4) Restart `target_switch_fork_validator_pubkey`, and ensure they vote on their own leader slot
// that's not descended from the duplicate slot
info!("Restarting switch fork node");
target_switch_fork_validator_info.config.voting_disabled = false;
cluster.restart_node(
&target_switch_fork_validator_pubkey,
target_switch_fork_validator_info,
SocketAddrSpace::Unspecified,
);
let target_switch_fork_validator_ledger_path =
cluster.ledger_path(&target_switch_fork_validator_pubkey);
info!("Waiting for switch fork to make block past duplicate fork");
loop {
let last_vote = wait_for_last_vote_in_tower_to_land_in_ledger(
&target_switch_fork_validator_ledger_path,
&target_switch_fork_validator_pubkey,
);
if let Some(latest_vote_slot) = last_vote {
if latest_vote_slot > dup_slot {
let blockstore = open_blockstore(&target_switch_fork_validator_ledger_path);
let ancestor_slots: HashSet<Slot> =
AncestorIterator::new_inclusive(latest_vote_slot, &blockstore).collect();
assert!(ancestor_slots.contains(&latest_vote_slot));
assert!(ancestor_slots.contains(&0));
assert!(!ancestor_slots.contains(&dup_slot));
break;
}
}
sleep(Duration::from_millis(1000));
}
// Now restart the duplicate validators
// Start the node with partition enabled so they don't see the `target_switch_fork_validator_pubkey`
// before voting on the duplicate block
info!("Restarting duplicate fork node");
// Ensure `duplicate_fork_validator1_pubkey` votes before starting up `duplicate_fork_validator2_pubkey`
// to prevent them seeing `dup_slot` as duplicate confirmed before voting.
restart_dup_validator(
&mut cluster,
duplicate_fork_validator1_info,
&duplicate_fork_validator1_pubkey,
dup_slot,
&dup_shred1,
&dup_shred2,
);
restart_dup_validator(
&mut cluster,
duplicate_fork_validator2_info,
&duplicate_fork_validator2_pubkey,
dup_slot,
&dup_shred1,
&dup_shred2,
);
// Wait for the `duplicate_fork_validator1_pubkey` to make another leader block on top
// of the duplicate fork which includes their own vote for `dup_block`. This
// should make the duplicate fork the heaviest
info!("Waiting on duplicate fork validator to generate block on top of duplicate fork",);
loop {
let duplicate_fork_validator_blockstore =
open_blockstore(&cluster.ledger_path(&duplicate_fork_validator1_pubkey));
let meta = duplicate_fork_validator_blockstore
.meta(dup_slot)
.unwrap()
.unwrap();
if !meta.next_slots.is_empty() {
info!(
"duplicate fork validator saw new slots: {:?} on top of duplicate slot",
meta.next_slots
);
break;
}
sleep(Duration::from_millis(1000));
}
// Check that the cluster is making progress
cluster.check_for_new_roots(
16,
"test_duplicate_shreds_switch_failure",
SocketAddrSpace::Unspecified,
);
}

View File

@ -87,7 +87,7 @@ pub enum BroadcastStageReturnType {
ChannelDisconnected,
}
#[derive(PartialEq, Eq, Clone, Debug)]
#[derive(Clone, Debug)]
pub enum BroadcastStageType {
Standard,
FailEntryVerification,

View File

@ -1,6 +1,7 @@
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
crossbeam_channel::Sender,
itertools::Itertools,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
@ -16,10 +17,20 @@ pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
pub const DUPLICATE_RATE: usize = 10;
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum ClusterPartition {
Stake(u64),
Pubkey(Vec<Pubkey>),
}
#[derive(Clone, Debug)]
pub struct BroadcastDuplicatesConfig {
/// Amount of stake (excluding the leader) to send different version of slots to.
/// Amount of stake (excluding the leader) or a set of validator pubkeys
/// to send a duplicate version of some slots to.
/// Note this is sampled from a list of stakes sorted least to greatest.
pub stake_partition: u64,
pub partition: ClusterPartition,
/// If passed `Some(receiver)`, will signal all the duplicate slots via the given
/// `receiver`
pub duplicate_slot_sender: Option<Sender<Slot>>,
}
#[derive(Clone)]
@ -253,6 +264,9 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.iter()
.all(|shred| shred.slot() == bank.slot()));
if let Some(duplicate_slot_sender) = &self.config.duplicate_slot_sender {
let _ = duplicate_slot_sender.send(bank.slot());
}
socket_sender.send((original_last_data_shred, None))?;
socket_sender.send((partition_last_data_shred, None))?;
}
@ -280,20 +294,25 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let self_pubkey = cluster_info.id();
// Create cluster partition.
let cluster_partition: HashSet<Pubkey> = {
let mut cumilative_stake = 0;
let epoch = root_bank.get_leader_schedule_epoch(slot);
root_bank
.epoch_staked_nodes(epoch)
.unwrap()
.iter()
.filter(|(pubkey, _)| **pubkey != self_pubkey)
.sorted_by_key(|(pubkey, stake)| (**stake, **pubkey))
.take_while(|(_, stake)| {
cumilative_stake += *stake;
cumilative_stake <= self.config.stake_partition
})
.map(|(pubkey, _)| *pubkey)
.collect()
match &self.config.partition {
ClusterPartition::Stake(partition_total_stake) => {
let mut cumulative_stake = 0;
let epoch = root_bank.get_leader_schedule_epoch(slot);
root_bank
.epoch_staked_nodes(epoch)
.unwrap()
.iter()
.filter(|(pubkey, _)| **pubkey != self_pubkey)
.sorted_by_key(|(pubkey, stake)| (**stake, **pubkey))
.take_while(|(_, stake)| {
cumulative_stake += *stake;
cumulative_stake <= *partition_total_stake
})
.map(|(pubkey, _)| *pubkey)
.collect()
}
ClusterPartition::Pubkey(pubkeys) => pubkeys.iter().cloned().collect(),
}
};
// Broadcast data
@ -316,10 +335,10 @@ impl BroadcastRun for BroadcastDuplicatesRun {
{
if cluster_partition.contains(node.pubkey()) {
info!(
"skipping node {} for original shred index {}, slot {}",
node.pubkey(),
"Not broadcasting original shred index {}, slot {} to partition node {}",
shred.index(),
shred.slot()
shred.slot(),
node.pubkey(),
);
return None;
}
@ -337,6 +356,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
cluster_partition
.iter()
.filter_map(|pubkey| {
info!(
"Broadcasting partition shred index {}, slot {} to partition node {}",
shred.index(),
shred.slot(),
pubkey,
);
let tvu = cluster_info
.lookup_contact_info(pubkey, |node| node.tvu(Protocol::UDP))?
.ok()?;