Fix rules for fork selection (#6906)

automerge
This commit is contained in:
anatoly yakovenko 2019-11-15 08:36:33 -08:00 committed by Grimes
parent e1643c91c4
commit 59413b3124
9 changed files with 424 additions and 104 deletions

View File

@ -11,7 +11,7 @@ use std::{
pub const VOTE_THRESHOLD_DEPTH: usize = 8;
pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64;
#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct StakeLockout {
lockout: u64,
stake: u64,
@ -258,7 +258,7 @@ impl Tower {
}
// a slot is not recent if it's older than the newest vote we have
fn is_recent(&self, slot: u64) -> bool {
pub fn is_recent(&self, slot: u64) -> bool {
if let Some(last_vote) = self.lockouts.votes.back() {
if slot <= last_vote.slot {
return false;
@ -316,7 +316,15 @@ impl Tower {
let vote = lockouts.nth_recent_vote(self.threshold_depth);
if let Some(vote) = vote {
if let Some(fork_stake) = stake_lockouts.get(&vote.slot) {
(fork_stake.stake as f64 / total_staked as f64) > self.threshold_size
let lockout = fork_stake.stake as f64 / total_staked as f64;
trace!(
"fork_stake {} {} {} {}",
slot,
lockout,
fork_stake.stake,
total_staked
);
lockout > self.threshold_size
} else {
false
}

View File

@ -33,6 +33,7 @@ pub mod gossip_service;
pub mod ledger_cleanup_service;
pub mod local_vote_signer_service;
pub mod packet;
pub mod partition_cfg;
pub mod poh_recorder;
pub mod poh_service;
pub mod recvmmsg;

92
core/src/partition_cfg.rs Normal file
View File

@ -0,0 +1,92 @@
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::Shred;
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
///Configure a partition in the retransmit stage
#[derive(Debug, Clone)]
pub struct Partition {
pub num_partitions: usize,
pub my_partition: usize,
pub start_ts: u64,
pub end_ts: u64,
leaders: Arc<RwLock<Vec<Pubkey>>>,
}
impl Default for Partition {
fn default() -> Self {
Self {
num_partitions: 0,
my_partition: 0,
start_ts: 0,
end_ts: 0,
leaders: Arc::new(RwLock::new(vec![])),
}
}
}
#[derive(Default, Debug, Clone)]
pub struct PartitionCfg {
partitions: Vec<Partition>,
}
impl PartitionCfg {
pub fn new(partitions: Vec<Partition>) -> Self {
Self { partitions }
}
pub fn is_connected(
&self,
bank: &Option<Arc<Bank>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
shred: &Shred,
) -> bool {
if bank.is_none() {
return true;
}
let bank = bank.as_ref().unwrap().clone();
let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank));
let slot_leader_pubkey = slot_leader_pubkey.unwrap_or_default();
let time = timestamp();
for p in &self.partitions {
let is_time = (p.start_ts <= time) && (time < p.end_ts);
if !is_time {
continue;
}
trace!("PARTITION_TEST partition time! {}", p.my_partition);
if p.num_partitions == 0 {
continue;
}
if p.leaders.read().unwrap().is_empty() {
let mut leader_vec = p.leaders.write().unwrap();
let mut leaders: Vec<Pubkey> = bank.vote_accounts().keys().cloned().collect();
leaders.sort();
*leader_vec = leaders;
warn!("PARTITION_TEST partition enabled {}", p.my_partition);
}
let is_connected: bool = {
let leaders = p.leaders.read().unwrap();
let start = p.my_partition * leaders.len() / p.num_partitions;
let partition_size = leaders.len() / p.num_partitions;
let end = start + partition_size;
let end = if leaders.len() - end < partition_size {
leaders.len()
} else {
end
};
let my_leaders: HashSet<_> = leaders[start..end].iter().collect();
my_leaders.contains(&slot_leader_pubkey)
};
if is_connected {
trace!("PARTITION_TEST connected {}", p.my_partition);
continue;
}
trace!("PARTITION_TEST not connected {}", p.my_partition);
return false;
}
trace!("PARTITION_TEST connected");
true
}
}

View File

@ -42,6 +42,8 @@ use std::{
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
type VoteAndPoHBank = (Option<(Arc<Bank>, u64)>, Option<Arc<Bank>>);
// Implement a destructor for the ReplayStage thread to signal it exited
// even on panics
struct Finalizer {
@ -76,6 +78,20 @@ struct ReplaySlotStats {
replay_start: Instant,
}
#[derive(Debug, Clone, Default)]
struct ForkStats {
weight: u128,
total_staked: u64,
slot: Slot,
block_height: u64,
has_voted: bool,
is_recent: bool,
vote_threshold: bool,
is_locked_out: bool,
stake_lockouts: HashMap<u64, StakeLockout>,
computed: bool,
}
impl ReplaySlotStats {
pub fn new(slot: Slot) -> Self {
Self {
@ -123,6 +139,7 @@ struct ForkProgress {
started_ms: u64,
is_dead: bool,
stats: ReplaySlotStats,
fork_stats: ForkStats,
}
impl ForkProgress {
@ -135,6 +152,7 @@ impl ForkProgress {
started_ms: timing::timestamp(),
is_dead: false,
stats: ReplaySlotStats::new(slot),
fork_stats: ForkStats::default(),
}
}
}
@ -186,7 +204,8 @@ impl ReplayStage {
let _exit = Finalizer::new(exit_.clone());
let mut progress = HashMap::new();
let mut current_leader = None;
let mut last_reset = Hash::default();
let mut partition = false;
loop {
let now = Instant::now();
// Stop getting entries if we get exit signal
@ -211,51 +230,68 @@ impl ReplayStage {
);
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let votable = Self::generate_votable_banks(
&ancestors,
&bank_forks,
&tower,
&mut progress,
);
if let Some((_, bank, _, total_staked)) = votable.into_iter().last() {
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
{
Self::log_leader_change(
&my_pubkey,
bank.slot(),
&mut current_leader,
&votable_leader,
);
loop {
let (vote_bank, heaviest) =
Self::select_fork(&ancestors, &bank_forks, &tower, &mut progress);
let done = vote_bank.is_none();
let mut vote_bank_slot = 0;
let reset_bank = vote_bank.as_ref().map(|b| b.0.clone()).or(heaviest);
if let Some((bank, total_staked)) = vote_bank {
info!("voting: {}", bank.slot());
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
{
Self::log_leader_change(
&my_pubkey,
bank.slot(),
&mut current_leader,
&votable_leader,
);
}
vote_bank_slot = bank.slot();
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut tower,
&mut progress,
&vote_account,
&voting_keypair,
&cluster_info,
&blocktree,
&leader_schedule_cache,
&root_bank_sender,
total_staked,
&lockouts_sender,
&snapshot_package_sender,
)?;
}
if let Some(bank) = reset_bank {
if last_reset != bank.last_blockhash() {
Self::reset_poh_recorder(
&my_pubkey,
&blocktree,
&bank,
&poh_recorder,
&leader_schedule_cache,
);
last_reset = bank.last_blockhash();
tpu_has_bank = false;
info!("vote bank: {} reset bank: {}", vote_bank_slot, bank.slot());
if !partition && vote_bank_slot != bank.slot() {
warn!("PARTITION DETECTED waiting to join fork: {} last vote: {:?}", bank.slot(), tower.last_vote());
inc_new_counter_info!("replay_stage-partition_detected", 1);
partition = true;
} else if partition && vote_bank_slot == bank.slot() {
warn!("PARTITION resolved fork: {} last vote: {:?}", bank.slot(), tower.last_vote());
partition = false;
inc_new_counter_info!("replay_stage-partition_resolved", 1);
}
}
}
if done {
break;
}
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut tower,
&mut progress,
&vote_account,
&voting_keypair,
&cluster_info,
&blocktree,
&leader_schedule_cache,
&root_bank_sender,
total_staked,
&lockouts_sender,
&snapshot_package_sender,
)?;
Self::reset_poh_recorder(
&my_pubkey,
&blocktree,
&bank,
&poh_recorder,
&leader_schedule_cache,
);
tpu_has_bank = false;
}
if !tpu_has_bank {
@ -277,7 +313,7 @@ impl ReplayStage {
}
inc_new_counter_info!(
"replicate_stage-duration",
"replay_stage-duration",
duration_as_ms(&now.elapsed()) as usize
);
if did_complete_bank {
@ -582,7 +618,7 @@ impl ReplayStage {
};
info!(
"{} voted and reset PoH to tick {} (within slot {}). {}",
"{} reset PoH to tick {} (within slot {}). {}",
my_pubkey,
bank.tick_height(),
bank.slot(),
@ -643,72 +679,145 @@ impl ReplayStage {
did_complete_bank
}
#[allow(clippy::type_complexity)]
fn generate_votable_banks(
fn select_fork(
ancestors: &HashMap<u64, HashSet<u64>>,
bank_forks: &Arc<RwLock<BankForks>>,
tower: &Tower,
progress: &mut HashMap<u64, ForkProgress>,
) -> Vec<(u128, Arc<Bank>, HashMap<u64, StakeLockout>, u64)> {
) -> VoteAndPoHBank {
let tower_start = Instant::now();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>, HashMap<u64, StakeLockout>, u64)> = frozen_banks
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.filter(|b| {
let has_voted = tower.has_voted(b.slot());
trace!("bank has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = tower.is_locked_out(b.slot(), &ancestors);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.cloned()
.collect();
frozen_banks.sort_by_key(|bank| bank.slot());
trace!("frozen_banks {}", frozen_banks.len());
let stats: Vec<ForkStats> = frozen_banks
.iter()
.map(|bank| {
(
bank,
tower.collect_vote_lockouts(
let mut stats = progress
.get(&bank.slot())
.map(|s| s.fork_stats.clone())
.unwrap_or_default();
if !stats.computed {
stats.slot = bank.slot();
let (stake_lockouts, total_staked) = tower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts().into_iter(),
&ancestors,
),
)
})
.filter(|(b, (stake_lockouts, total_staked))| {
let vote_threshold =
tower.check_vote_stake_threshold(b.slot(), &stake_lockouts, *total_staked);
Self::confirm_forks(tower, &stake_lockouts, *total_staked, progress, bank_forks);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, (stake_lockouts, total_staked))| {
(
tower.calculate_weight(&stake_lockouts),
b.clone(),
stake_lockouts,
total_staked,
)
);
Self::confirm_forks(tower, &stake_lockouts, total_staked, progress, bank_forks);
stats.total_staked = total_staked;
stats.weight = tower.calculate_weight(&stake_lockouts);
stats.stake_lockouts = stake_lockouts;
stats.block_height = bank.block_height();
stats.computed = true;
}
stats.vote_threshold = tower.check_vote_stake_threshold(
bank.slot(),
&stats.stake_lockouts,
stats.total_staked,
);
stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors);
stats.has_voted = tower.has_voted(bank.slot());
stats.is_recent = tower.is_recent(bank.slot());
if let Some(fp) = progress.get_mut(&bank.slot()) {
fp.fork_stats = stats.clone();
}
stats
})
.collect();
let mut votable: Vec<_> = frozen_banks
.iter()
.zip(stats.iter())
.filter(|(_, stats)| stats.is_recent && !stats.has_voted && stats.vote_threshold)
.collect();
votable.sort_by_key(|b| b.0);
let ms = timing::duration_as_ms(&tower_start.elapsed());
//highest weight, lowest slot first
votable.sort_by_key(|b| (b.1.weight, 0i64 - b.1.slot as i64));
votable.iter().for_each(|(_, stats)| {
let mut parents: Vec<_> = if let Some(set) = ancestors.get(&stats.slot) {
set.iter().collect()
} else {
vec![]
};
parents.sort();
debug!("{}: {:?} {:?}", stats.slot, stats, parents,);
});
trace!("votable_banks {}", votable.len());
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} tower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
let rv = Self::pick_best_fork(ancestors, &votable);
let ms = timing::duration_as_ms(&tower_start.elapsed());
let weights: Vec<(u128, u64, u64)> = votable
.iter()
.map(|x| (x.1.weight, x.1.slot, x.1.block_height))
.collect();
debug!(
"@{:?} tower duration: {:?} len: {}/{} weights: {:?} voting: {}",
timing::timestamp(),
ms,
votable.len(),
stats.iter().filter(|s| !s.has_voted).count(),
weights,
rv.0.is_some()
);
inc_new_counter_info!("replay_stage-tower_duration", ms as usize);
rv
}
votable
fn pick_best_fork(
ancestors: &HashMap<u64, HashSet<u64>>,
best_banks: &[(&Arc<Bank>, &ForkStats)],
) -> VoteAndPoHBank {
if best_banks.is_empty() {
return (None, None);
}
let mut rv = None;
let (best_bank, best_stats) = best_banks.last().unwrap();
debug!("best bank: {:?}", best_stats);
let mut by_slot: Vec<_> = best_banks.iter().collect();
by_slot.sort_by_key(|x| x.1.slot);
//look for the oldest ancestors of the best bank
if let Some(best_ancestors) = ancestors.get(&best_stats.slot) {
for (parent, parent_stats) in by_slot.iter() {
if parent_stats.is_locked_out {
continue;
}
if !best_ancestors.contains(&parent_stats.slot) {
continue;
}
debug!("best bank found ancestor: {}", parent_stats.slot);
inc_new_counter_info!("replay_stage-pick_best_fork-ancestor", 1);
rv = Some(((*parent).clone(), parent_stats.total_staked));
}
}
//look for the oldest child of the best bank
if rv.is_none() {
for (child, child_stats) in by_slot.iter().rev() {
if child_stats.is_locked_out {
continue;
}
let has_best = best_stats.slot == child_stats.slot
|| ancestors
.get(&child.slot())
.map(|set| set.contains(&best_stats.slot))
.unwrap_or(false);
if !has_best {
continue;
}
inc_new_counter_info!("replay_stage-pick_best_fork-child", 1);
debug!("best bank found child: {}", child_stats.slot);
rv = Some(((*child).clone(), child_stats.total_staked));
}
}
if rv.is_none() {
inc_new_counter_info!("replay_stage-fork_selection-heavy_bank_lockout", 1);
}
(rv, Some((*best_bank).clone()))
}
fn confirm_forks(

View File

@ -3,6 +3,7 @@
use crate::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
packet::Packets,
partition_cfg::PartitionCfg,
repair_service::RepairStrategy,
result::{Error, Result},
streamer::PacketReceiver,
@ -211,6 +212,7 @@ impl RetransmitStage {
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<PartitionCfg>,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -239,13 +241,18 @@ impl RetransmitStage {
repair_strategy,
&leader_schedule_cache.clone(),
move |id, shred, working_bank, last_root| {
should_retransmit_and_persist(
let is_connected = cfg
.as_ref()
.map(|x| x.is_connected(&working_bank, &leader_schedule_cache, shred))
.unwrap_or(true);
let rv = should_retransmit_and_persist(
shred,
working_bank,
&leader_schedule_cache,
id,
last_root,
)
);
rv && is_connected
},
);

View File

@ -5,6 +5,7 @@ use crate::blockstream_service::BlockstreamService;
use crate::cluster_info::ClusterInfo;
use crate::commitment::BlockCommitmentCache;
use crate::ledger_cleanup_service::LedgerCleanupService;
use crate::partition_cfg::PartitionCfg;
use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
@ -72,6 +73,7 @@ impl Tvu {
completed_slots_receiver: CompletedSlotsReceiver,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
sigverify_disabled: bool,
cfg: Option<PartitionCfg>,
) -> Self
where
T: 'static + KeypairUtil + Sync + Send,
@ -129,6 +131,7 @@ impl Tvu {
&exit,
completed_slots_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
);
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
@ -290,6 +293,7 @@ pub mod tests {
completed_slots_receiver,
block_commitment_cache,
false,
None,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -6,6 +6,7 @@ use crate::{
commitment::BlockCommitmentCache,
contact_info::ContactInfo,
gossip_service::{discover_cluster, GossipService},
partition_cfg::PartitionCfg,
poh_recorder::PohRecorder,
poh_service::PohService,
rpc::JsonRpcConfig,
@ -59,6 +60,7 @@ pub struct ValidatorConfig {
pub snapshot_config: Option<SnapshotConfig>,
pub max_ledger_slots: Option<u64>,
pub broadcast_stage_type: BroadcastStageType,
pub partition_cfg: Option<PartitionCfg>,
}
impl Default for ValidatorConfig {
@ -75,6 +77,7 @@ impl Default for ValidatorConfig {
rpc_config: JsonRpcConfig::default(),
snapshot_config: None,
broadcast_stage_type: BroadcastStageType::Standard,
partition_cfg: None,
}
}
}
@ -342,6 +345,7 @@ impl Validator {
completed_slots_receiver,
block_commitment_cache,
config.dev_sigverify_disabled,
config.partition_cfg.clone(),
);
if config.dev_sigverify_disabled {

View File

@ -192,7 +192,8 @@ pub fn kill_entry_and_spend_and_verify_rest(
let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes);
let client = create_client(entry_point_info.client_facing_addr(), VALIDATOR_PORT_RANGE);
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * 3;
// sleep long enough to make sure we are in epoch 3
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1);
for ingress_node in &cluster_nodes {
client

View File

@ -2,8 +2,11 @@ use log::*;
use serial_test_derive::serial;
use solana_client::thin_client::create_client;
use solana_core::{
broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH,
gossip_service::discover_cluster, validator::ValidatorConfig,
broadcast_stage::BroadcastStageType,
consensus::VOTE_THRESHOLD_DEPTH,
gossip_service::discover_cluster,
partition_cfg::{Partition, PartitionCfg},
validator::ValidatorConfig,
};
use solana_ledger::{bank_forks::SnapshotConfig, blocktree::Blocktree, snapshot_utils};
use solana_local_cluster::{
@ -12,6 +15,7 @@ use solana_local_cluster::{
local_cluster::{ClusterConfig, LocalCluster},
};
use solana_runtime::accounts_db::AccountsDB;
use solana_sdk::timing::timestamp;
use solana_sdk::{
client::SyncClient,
clock,
@ -183,6 +187,96 @@ fn test_leader_failure_4() {
config.ticks_per_slot * config.poh_config.target_tick_duration.as_millis() as u64,
);
}
fn run_network_partition(partitions: &[usize]) {
solana_logger::setup();
info!("PARTITION_TEST!");
let num_nodes = partitions.iter().sum();
let validator_config = ValidatorConfig::default();
let mut config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; num_nodes],
validator_configs: vec![validator_config.clone(); num_nodes],
..ClusterConfig::default()
};
let now = timestamp();
let partition_start = now + 30_000;
let partition_end = partition_start + 10_000;
let mut total = 0;
for (j, pn) in partitions.iter().enumerate() {
info!(
"PARTITION_TEST configuring partition {} for nodes {} - {}",
j,
total,
total + *pn
);
for i in total..(total + *pn) {
let mut p1 = Partition::default();
p1.num_partitions = partitions.len();
p1.my_partition = j;
p1.start_ts = partition_start;
p1.end_ts = partition_end;
config.validator_configs[i].partition_cfg = Some(PartitionCfg::new(vec![p1]));
}
total += *pn;
}
info!(
"PARTITION_TEST starting cluster with {:?} partitions",
partitions
);
let cluster = LocalCluster::new(&config);
let now = timestamp();
let timeout = partition_start as i64 - now as i64;
info!(
"PARTITION_TEST sleeping until partition start timeout {}",
timeout
);
if timeout > 0 {
sleep(Duration::from_millis(timeout as u64));
}
info!("PARTITION_TEST done sleeping until partition start timeout");
let now = timestamp();
let timeout = partition_end as i64 - now as i64;
info!(
"PARTITION_TEST sleeping until partition end timeout {}",
timeout
);
if timeout > 0 {
sleep(Duration::from_millis(timeout as u64));
}
info!("PARTITION_TEST done sleeping until partition end timeout");
info!("PARTITION_TEST spending on all ndoes");
cluster_tests::spend_and_verify_all_nodes(
&cluster.entry_point_info,
&cluster.funding_keypair,
num_nodes,
HashSet::new(),
);
info!("PARTITION_TEST done spending on all ndoes");
}
#[allow(unused_attributes)]
#[ignore]
#[test]
#[serial]
fn test_network_partition_1_2() {
run_network_partition(&[1, 2])
}
#[allow(unused_attributes)]
#[ignore]
#[test]
#[serial]
fn test_network_partition_1_1() {
run_network_partition(&[1, 1])
}
#[test]
#[serial]
fn test_network_partition_1_1_1() {
run_network_partition(&[1, 1, 1])
}
#[test]
#[serial]
fn test_two_unbalanced_stakes() {