diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 45754b9da..f06b4ad3a 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -11,7 +11,7 @@ use std::{ pub const VOTE_THRESHOLD_DEPTH: usize = 8; pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct StakeLockout { lockout: u64, stake: u64, @@ -258,7 +258,7 @@ impl Tower { } // a slot is not recent if it's older than the newest vote we have - fn is_recent(&self, slot: u64) -> bool { + pub fn is_recent(&self, slot: u64) -> bool { if let Some(last_vote) = self.lockouts.votes.back() { if slot <= last_vote.slot { return false; @@ -316,7 +316,15 @@ impl Tower { let vote = lockouts.nth_recent_vote(self.threshold_depth); if let Some(vote) = vote { if let Some(fork_stake) = stake_lockouts.get(&vote.slot) { - (fork_stake.stake as f64 / total_staked as f64) > self.threshold_size + let lockout = fork_stake.stake as f64 / total_staked as f64; + trace!( + "fork_stake {} {} {} {}", + slot, + lockout, + fork_stake.stake, + total_staked + ); + lockout > self.threshold_size } else { false } diff --git a/core/src/lib.rs b/core/src/lib.rs index a04a54966..d94bd5924 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -33,6 +33,7 @@ pub mod gossip_service; pub mod ledger_cleanup_service; pub mod local_vote_signer_service; pub mod packet; +pub mod partition_cfg; pub mod poh_recorder; pub mod poh_service; pub mod recvmmsg; diff --git a/core/src/partition_cfg.rs b/core/src/partition_cfg.rs new file mode 100644 index 000000000..480176c19 --- /dev/null +++ b/core/src/partition_cfg.rs @@ -0,0 +1,92 @@ +use solana_ledger::leader_schedule_cache::LeaderScheduleCache; +use solana_ledger::shred::Shred; +use solana_runtime::bank::Bank; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::timing::timestamp; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::RwLock; + +///Configure a partition in the retransmit stage +#[derive(Debug, Clone)] +pub struct Partition { + pub num_partitions: usize, + pub my_partition: usize, + pub start_ts: u64, + pub end_ts: u64, + leaders: Arc>>, +} +impl Default for Partition { + fn default() -> Self { + Self { + num_partitions: 0, + my_partition: 0, + start_ts: 0, + end_ts: 0, + leaders: Arc::new(RwLock::new(vec![])), + } + } +} + +#[derive(Default, Debug, Clone)] +pub struct PartitionCfg { + partitions: Vec, +} + +impl PartitionCfg { + pub fn new(partitions: Vec) -> Self { + Self { partitions } + } + pub fn is_connected( + &self, + bank: &Option>, + leader_schedule_cache: &Arc, + shred: &Shred, + ) -> bool { + if bank.is_none() { + return true; + } + let bank = bank.as_ref().unwrap().clone(); + let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)); + let slot_leader_pubkey = slot_leader_pubkey.unwrap_or_default(); + let time = timestamp(); + for p in &self.partitions { + let is_time = (p.start_ts <= time) && (time < p.end_ts); + if !is_time { + continue; + } + trace!("PARTITION_TEST partition time! {}", p.my_partition); + if p.num_partitions == 0 { + continue; + } + if p.leaders.read().unwrap().is_empty() { + let mut leader_vec = p.leaders.write().unwrap(); + let mut leaders: Vec = bank.vote_accounts().keys().cloned().collect(); + leaders.sort(); + *leader_vec = leaders; + warn!("PARTITION_TEST partition enabled {}", p.my_partition); + } + let is_connected: bool = { + let leaders = p.leaders.read().unwrap(); + let start = p.my_partition * leaders.len() / p.num_partitions; + let partition_size = leaders.len() / p.num_partitions; + let end = start + partition_size; + let end = if leaders.len() - end < partition_size { + leaders.len() + } else { + end + }; + let my_leaders: HashSet<_> = leaders[start..end].iter().collect(); + my_leaders.contains(&slot_leader_pubkey) + }; + if is_connected { + trace!("PARTITION_TEST connected {}", p.my_partition); + continue; + } + trace!("PARTITION_TEST not connected {}", p.my_partition); + return false; + } + trace!("PARTITION_TEST connected"); + true + } +} diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 91364a033..0151b1e30 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -42,6 +42,8 @@ use std::{ pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; +type VoteAndPoHBank = (Option<(Arc, u64)>, Option>); + // Implement a destructor for the ReplayStage thread to signal it exited // even on panics struct Finalizer { @@ -76,6 +78,20 @@ struct ReplaySlotStats { replay_start: Instant, } +#[derive(Debug, Clone, Default)] +struct ForkStats { + weight: u128, + total_staked: u64, + slot: Slot, + block_height: u64, + has_voted: bool, + is_recent: bool, + vote_threshold: bool, + is_locked_out: bool, + stake_lockouts: HashMap, + computed: bool, +} + impl ReplaySlotStats { pub fn new(slot: Slot) -> Self { Self { @@ -123,6 +139,7 @@ struct ForkProgress { started_ms: u64, is_dead: bool, stats: ReplaySlotStats, + fork_stats: ForkStats, } impl ForkProgress { @@ -135,6 +152,7 @@ impl ForkProgress { started_ms: timing::timestamp(), is_dead: false, stats: ReplaySlotStats::new(slot), + fork_stats: ForkStats::default(), } } } @@ -186,7 +204,8 @@ impl ReplayStage { let _exit = Finalizer::new(exit_.clone()); let mut progress = HashMap::new(); let mut current_leader = None; - + let mut last_reset = Hash::default(); + let mut partition = false; loop { let now = Instant::now(); // Stop getting entries if we get exit signal @@ -211,51 +230,68 @@ impl ReplayStage { ); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); - let votable = Self::generate_votable_banks( - &ancestors, - &bank_forks, - &tower, - &mut progress, - ); - - if let Some((_, bank, _, total_staked)) = votable.into_iter().last() { - subscriptions.notify_subscribers(bank.slot(), &bank_forks); - - if let Some(votable_leader) = - leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank)) - { - Self::log_leader_change( - &my_pubkey, - bank.slot(), - &mut current_leader, - &votable_leader, - ); + loop { + let (vote_bank, heaviest) = + Self::select_fork(&ancestors, &bank_forks, &tower, &mut progress); + let done = vote_bank.is_none(); + let mut vote_bank_slot = 0; + let reset_bank = vote_bank.as_ref().map(|b| b.0.clone()).or(heaviest); + if let Some((bank, total_staked)) = vote_bank { + info!("voting: {}", bank.slot()); + subscriptions.notify_subscribers(bank.slot(), &bank_forks); + if let Some(votable_leader) = + leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank)) + { + Self::log_leader_change( + &my_pubkey, + bank.slot(), + &mut current_leader, + &votable_leader, + ); + } + vote_bank_slot = bank.slot(); + Self::handle_votable_bank( + &bank, + &bank_forks, + &mut tower, + &mut progress, + &vote_account, + &voting_keypair, + &cluster_info, + &blocktree, + &leader_schedule_cache, + &root_bank_sender, + total_staked, + &lockouts_sender, + &snapshot_package_sender, + )?; + } + if let Some(bank) = reset_bank { + if last_reset != bank.last_blockhash() { + Self::reset_poh_recorder( + &my_pubkey, + &blocktree, + &bank, + &poh_recorder, + &leader_schedule_cache, + ); + last_reset = bank.last_blockhash(); + tpu_has_bank = false; + info!("vote bank: {} reset bank: {}", vote_bank_slot, bank.slot()); + if !partition && vote_bank_slot != bank.slot() { + warn!("PARTITION DETECTED waiting to join fork: {} last vote: {:?}", bank.slot(), tower.last_vote()); + inc_new_counter_info!("replay_stage-partition_detected", 1); + partition = true; + } else if partition && vote_bank_slot == bank.slot() { + warn!("PARTITION resolved fork: {} last vote: {:?}", bank.slot(), tower.last_vote()); + partition = false; + inc_new_counter_info!("replay_stage-partition_resolved", 1); + } + } + } + if done { + break; } - - Self::handle_votable_bank( - &bank, - &bank_forks, - &mut tower, - &mut progress, - &vote_account, - &voting_keypair, - &cluster_info, - &blocktree, - &leader_schedule_cache, - &root_bank_sender, - total_staked, - &lockouts_sender, - &snapshot_package_sender, - )?; - - Self::reset_poh_recorder( - &my_pubkey, - &blocktree, - &bank, - &poh_recorder, - &leader_schedule_cache, - ); - tpu_has_bank = false; } if !tpu_has_bank { @@ -277,7 +313,7 @@ impl ReplayStage { } inc_new_counter_info!( - "replicate_stage-duration", + "replay_stage-duration", duration_as_ms(&now.elapsed()) as usize ); if did_complete_bank { @@ -582,7 +618,7 @@ impl ReplayStage { }; info!( - "{} voted and reset PoH to tick {} (within slot {}). {}", + "{} reset PoH to tick {} (within slot {}). {}", my_pubkey, bank.tick_height(), bank.slot(), @@ -643,72 +679,145 @@ impl ReplayStage { did_complete_bank } - #[allow(clippy::type_complexity)] - fn generate_votable_banks( + fn select_fork( ancestors: &HashMap>, bank_forks: &Arc>, tower: &Tower, progress: &mut HashMap, - ) -> Vec<(u128, Arc, HashMap, u64)> { + ) -> VoteAndPoHBank { let tower_start = Instant::now(); - let frozen_banks = bank_forks.read().unwrap().frozen_banks(); - trace!("frozen_banks {}", frozen_banks.len()); - let mut votable: Vec<(u128, Arc, HashMap, u64)> = frozen_banks + let mut frozen_banks: Vec<_> = bank_forks + .read() + .unwrap() + .frozen_banks() .values() - .filter(|b| { - let has_voted = tower.has_voted(b.slot()); - trace!("bank has_voted: {} {}", b.slot(), has_voted); - !has_voted - }) - .filter(|b| { - let is_locked_out = tower.is_locked_out(b.slot(), &ancestors); - trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out); - !is_locked_out - }) + .cloned() + .collect(); + frozen_banks.sort_by_key(|bank| bank.slot()); + + trace!("frozen_banks {}", frozen_banks.len()); + let stats: Vec = frozen_banks + .iter() .map(|bank| { - ( - bank, - tower.collect_vote_lockouts( + let mut stats = progress + .get(&bank.slot()) + .map(|s| s.fork_stats.clone()) + .unwrap_or_default(); + if !stats.computed { + stats.slot = bank.slot(); + let (stake_lockouts, total_staked) = tower.collect_vote_lockouts( bank.slot(), bank.vote_accounts().into_iter(), &ancestors, - ), - ) - }) - .filter(|(b, (stake_lockouts, total_staked))| { - let vote_threshold = - tower.check_vote_stake_threshold(b.slot(), &stake_lockouts, *total_staked); - Self::confirm_forks(tower, &stake_lockouts, *total_staked, progress, bank_forks); - debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold); - vote_threshold - }) - .map(|(b, (stake_lockouts, total_staked))| { - ( - tower.calculate_weight(&stake_lockouts), - b.clone(), - stake_lockouts, - total_staked, - ) + ); + Self::confirm_forks(tower, &stake_lockouts, total_staked, progress, bank_forks); + stats.total_staked = total_staked; + stats.weight = tower.calculate_weight(&stake_lockouts); + stats.stake_lockouts = stake_lockouts; + stats.block_height = bank.block_height(); + stats.computed = true; + } + stats.vote_threshold = tower.check_vote_stake_threshold( + bank.slot(), + &stats.stake_lockouts, + stats.total_staked, + ); + stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors); + stats.has_voted = tower.has_voted(bank.slot()); + stats.is_recent = tower.is_recent(bank.slot()); + if let Some(fp) = progress.get_mut(&bank.slot()) { + fp.fork_stats = stats.clone(); + } + stats }) .collect(); + let mut votable: Vec<_> = frozen_banks + .iter() + .zip(stats.iter()) + .filter(|(_, stats)| stats.is_recent && !stats.has_voted && stats.vote_threshold) + .collect(); - votable.sort_by_key(|b| b.0); - let ms = timing::duration_as_ms(&tower_start.elapsed()); + //highest weight, lowest slot first + votable.sort_by_key(|b| (b.1.weight, 0i64 - b.1.slot as i64)); + votable.iter().for_each(|(_, stats)| { + let mut parents: Vec<_> = if let Some(set) = ancestors.get(&stats.slot) { + set.iter().collect() + } else { + vec![] + }; + parents.sort(); + debug!("{}: {:?} {:?}", stats.slot, stats, parents,); + }); trace!("votable_banks {}", votable.len()); - if !votable.is_empty() { - let weights: Vec = votable.iter().map(|x| x.0).collect(); - info!( - "@{:?} tower duration: {:?} len: {} weights: {:?}", - timing::timestamp(), - ms, - votable.len(), - weights - ); - } + let rv = Self::pick_best_fork(ancestors, &votable); + let ms = timing::duration_as_ms(&tower_start.elapsed()); + let weights: Vec<(u128, u64, u64)> = votable + .iter() + .map(|x| (x.1.weight, x.1.slot, x.1.block_height)) + .collect(); + debug!( + "@{:?} tower duration: {:?} len: {}/{} weights: {:?} voting: {}", + timing::timestamp(), + ms, + votable.len(), + stats.iter().filter(|s| !s.has_voted).count(), + weights, + rv.0.is_some() + ); inc_new_counter_info!("replay_stage-tower_duration", ms as usize); + rv + } - votable + fn pick_best_fork( + ancestors: &HashMap>, + best_banks: &[(&Arc, &ForkStats)], + ) -> VoteAndPoHBank { + if best_banks.is_empty() { + return (None, None); + } + let mut rv = None; + let (best_bank, best_stats) = best_banks.last().unwrap(); + debug!("best bank: {:?}", best_stats); + let mut by_slot: Vec<_> = best_banks.iter().collect(); + by_slot.sort_by_key(|x| x.1.slot); + //look for the oldest ancestors of the best bank + if let Some(best_ancestors) = ancestors.get(&best_stats.slot) { + for (parent, parent_stats) in by_slot.iter() { + if parent_stats.is_locked_out { + continue; + } + if !best_ancestors.contains(&parent_stats.slot) { + continue; + } + debug!("best bank found ancestor: {}", parent_stats.slot); + inc_new_counter_info!("replay_stage-pick_best_fork-ancestor", 1); + rv = Some(((*parent).clone(), parent_stats.total_staked)); + } + } + //look for the oldest child of the best bank + if rv.is_none() { + for (child, child_stats) in by_slot.iter().rev() { + if child_stats.is_locked_out { + continue; + } + let has_best = best_stats.slot == child_stats.slot + || ancestors + .get(&child.slot()) + .map(|set| set.contains(&best_stats.slot)) + .unwrap_or(false); + if !has_best { + continue; + } + inc_new_counter_info!("replay_stage-pick_best_fork-child", 1); + debug!("best bank found child: {}", child_stats.slot); + rv = Some(((*child).clone(), child_stats.total_staked)); + } + } + if rv.is_none() { + inc_new_counter_info!("replay_stage-fork_selection-heavy_bank_lockout", 1); + } + (rv, Some((*best_bank).clone())) } fn confirm_forks( diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 023c88046..1a9664fbd 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, packet::Packets, + partition_cfg::PartitionCfg, repair_service::RepairStrategy, result::{Error, Result}, streamer::PacketReceiver, @@ -211,6 +212,7 @@ impl RetransmitStage { exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, + cfg: Option, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -239,13 +241,18 @@ impl RetransmitStage { repair_strategy, &leader_schedule_cache.clone(), move |id, shred, working_bank, last_root| { - should_retransmit_and_persist( + let is_connected = cfg + .as_ref() + .map(|x| x.is_connected(&working_bank, &leader_schedule_cache, shred)) + .unwrap_or(true); + let rv = should_retransmit_and_persist( shred, working_bank, &leader_schedule_cache, id, last_root, - ) + ); + rv && is_connected }, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d8e2c0ce1..9be8f1550 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -5,6 +5,7 @@ use crate::blockstream_service::BlockstreamService; use crate::cluster_info::ClusterInfo; use crate::commitment::BlockCommitmentCache; use crate::ledger_cleanup_service::LedgerCleanupService; +use crate::partition_cfg::PartitionCfg; use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; @@ -72,6 +73,7 @@ impl Tvu { completed_slots_receiver: CompletedSlotsReceiver, block_commitment_cache: Arc>, sigverify_disabled: bool, + cfg: Option, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -129,6 +131,7 @@ impl Tvu { &exit, completed_slots_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), + cfg, ); let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); @@ -290,6 +293,7 @@ pub mod tests { completed_slots_receiver, block_commitment_cache, false, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index c6e6e91ee..113118b39 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -6,6 +6,7 @@ use crate::{ commitment::BlockCommitmentCache, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, + partition_cfg::PartitionCfg, poh_recorder::PohRecorder, poh_service::PohService, rpc::JsonRpcConfig, @@ -59,6 +60,7 @@ pub struct ValidatorConfig { pub snapshot_config: Option, pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, + pub partition_cfg: Option, } impl Default for ValidatorConfig { @@ -75,6 +77,7 @@ impl Default for ValidatorConfig { rpc_config: JsonRpcConfig::default(), snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, + partition_cfg: None, } } } @@ -342,6 +345,7 @@ impl Validator { completed_slots_receiver, block_commitment_cache, config.dev_sigverify_disabled, + config.partition_cfg.clone(), ); if config.dev_sigverify_disabled { diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 6fd345146..a80bee220 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -192,7 +192,8 @@ pub fn kill_entry_and_spend_and_verify_rest( let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap(); assert!(cluster_nodes.len() >= nodes); let client = create_client(entry_point_info.client_facing_addr(), VALIDATOR_PORT_RANGE); - let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * 3; + // sleep long enough to make sure we are in epoch 3 + let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1); for ingress_node in &cluster_nodes { client diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 2061a084c..d5fe6a5b6 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2,8 +2,11 @@ use log::*; use serial_test_derive::serial; use solana_client::thin_client::create_client; use solana_core::{ - broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH, - gossip_service::discover_cluster, validator::ValidatorConfig, + broadcast_stage::BroadcastStageType, + consensus::VOTE_THRESHOLD_DEPTH, + gossip_service::discover_cluster, + partition_cfg::{Partition, PartitionCfg}, + validator::ValidatorConfig, }; use solana_ledger::{bank_forks::SnapshotConfig, blocktree::Blocktree, snapshot_utils}; use solana_local_cluster::{ @@ -12,6 +15,7 @@ use solana_local_cluster::{ local_cluster::{ClusterConfig, LocalCluster}, }; use solana_runtime::accounts_db::AccountsDB; +use solana_sdk::timing::timestamp; use solana_sdk::{ client::SyncClient, clock, @@ -183,6 +187,96 @@ fn test_leader_failure_4() { config.ticks_per_slot * config.poh_config.target_tick_duration.as_millis() as u64, ); } + +fn run_network_partition(partitions: &[usize]) { + solana_logger::setup(); + info!("PARTITION_TEST!"); + let num_nodes = partitions.iter().sum(); + let validator_config = ValidatorConfig::default(); + let mut config = ClusterConfig { + cluster_lamports: 10_000, + node_stakes: vec![100; num_nodes], + validator_configs: vec![validator_config.clone(); num_nodes], + ..ClusterConfig::default() + }; + let now = timestamp(); + let partition_start = now + 30_000; + let partition_end = partition_start + 10_000; + let mut total = 0; + for (j, pn) in partitions.iter().enumerate() { + info!( + "PARTITION_TEST configuring partition {} for nodes {} - {}", + j, + total, + total + *pn + ); + for i in total..(total + *pn) { + let mut p1 = Partition::default(); + p1.num_partitions = partitions.len(); + p1.my_partition = j; + p1.start_ts = partition_start; + p1.end_ts = partition_end; + config.validator_configs[i].partition_cfg = Some(PartitionCfg::new(vec![p1])); + } + total += *pn; + } + info!( + "PARTITION_TEST starting cluster with {:?} partitions", + partitions + ); + let cluster = LocalCluster::new(&config); + let now = timestamp(); + let timeout = partition_start as i64 - now as i64; + info!( + "PARTITION_TEST sleeping until partition start timeout {}", + timeout + ); + if timeout > 0 { + sleep(Duration::from_millis(timeout as u64)); + } + info!("PARTITION_TEST done sleeping until partition start timeout"); + let now = timestamp(); + let timeout = partition_end as i64 - now as i64; + info!( + "PARTITION_TEST sleeping until partition end timeout {}", + timeout + ); + if timeout > 0 { + sleep(Duration::from_millis(timeout as u64)); + } + info!("PARTITION_TEST done sleeping until partition end timeout"); + info!("PARTITION_TEST spending on all ndoes"); + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + num_nodes, + HashSet::new(), + ); + info!("PARTITION_TEST done spending on all ndoes"); +} + +#[allow(unused_attributes)] +#[ignore] +#[test] +#[serial] +fn test_network_partition_1_2() { + run_network_partition(&[1, 2]) +} + +#[allow(unused_attributes)] +#[ignore] +#[test] +#[serial] +fn test_network_partition_1_1() { + run_network_partition(&[1, 1]) +} + +#[test] +#[serial] +fn test_network_partition_1_1_1() { + run_network_partition(&[1, 1, 1]) +} + #[test] #[serial] fn test_two_unbalanced_stakes() {