Consensus fix, don't consider threshold check if.. (#7948)
* Consensus fix, don't consider threshold check if lockouts are not increased * Change partition tests to wait for epoch with > lockout slots * Use atomic bool to signal partition
This commit is contained in:
parent
912aafcefd
commit
7058287273
|
@ -321,13 +321,27 @@ impl Tower {
|
|||
if let Some(fork_stake) = stake_lockouts.get(&vote.slot) {
|
||||
let lockout = fork_stake.stake as f64 / total_staked as f64;
|
||||
trace!(
|
||||
"fork_stake {} {} {} {}",
|
||||
"fork_stake slot: {} lockout: {} fork_stake: {} total_stake: {}",
|
||||
slot,
|
||||
lockout,
|
||||
fork_stake.stake,
|
||||
total_staked
|
||||
);
|
||||
lockout > self.threshold_size
|
||||
for (new_lockout, original_lockout) in
|
||||
lockouts.votes.iter().zip(self.lockouts.votes.iter())
|
||||
{
|
||||
if new_lockout.slot == original_lockout.slot {
|
||||
if new_lockout.confirmation_count <= self.threshold_depth as u32 {
|
||||
break;
|
||||
}
|
||||
if new_lockout.confirmation_count != original_lockout.confirmation_count {
|
||||
return lockout > self.threshold_size;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
|
@ -742,6 +756,34 @@ mod test {
|
|||
assert!(!tower.check_vote_stake_threshold(1, &stakes, 2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_vote_threshold_lockouts_not_updated() {
|
||||
solana_logger::setup();
|
||||
let mut tower = Tower::new_for_tests(1, 0.67);
|
||||
let stakes = vec![
|
||||
(
|
||||
0,
|
||||
StakeLockout {
|
||||
stake: 1,
|
||||
lockout: 8,
|
||||
},
|
||||
),
|
||||
(
|
||||
1,
|
||||
StakeLockout {
|
||||
stake: 2,
|
||||
lockout: 8,
|
||||
},
|
||||
),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
tower.record_vote(0, Hash::default());
|
||||
tower.record_vote(1, Hash::default());
|
||||
tower.record_vote(2, Hash::default());
|
||||
assert!(tower.check_vote_stake_threshold(6, &stakes, 2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lockout_is_updated_for_entire_branch() {
|
||||
let mut stake_lockouts = HashMap::new();
|
||||
|
|
|
@ -30,7 +30,6 @@ pub mod gossip_service;
|
|||
pub mod ledger_cleanup_service;
|
||||
pub mod local_vote_signer_service;
|
||||
pub mod packet;
|
||||
pub mod partition_cfg;
|
||||
pub mod poh_recorder;
|
||||
pub mod poh_service;
|
||||
pub mod recvmmsg;
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
|
||||
use solana_ledger::shred::Shred;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
///Configure a partition in the retransmit stage
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Partition {
|
||||
pub num_partitions: usize,
|
||||
pub my_partition: usize,
|
||||
pub start_ts: u64,
|
||||
pub end_ts: u64,
|
||||
leaders: Arc<RwLock<Vec<Pubkey>>>,
|
||||
}
|
||||
impl Default for Partition {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
num_partitions: 0,
|
||||
my_partition: 0,
|
||||
start_ts: 0,
|
||||
end_ts: 0,
|
||||
leaders: Arc::new(RwLock::new(vec![])),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
pub struct PartitionCfg {
|
||||
partitions: Vec<Partition>,
|
||||
}
|
||||
|
||||
impl PartitionCfg {
|
||||
pub fn new(partitions: Vec<Partition>) -> Self {
|
||||
Self { partitions }
|
||||
}
|
||||
pub fn is_connected(
|
||||
&self,
|
||||
bank: &Option<Arc<Bank>>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
shred: &Shred,
|
||||
) -> bool {
|
||||
if bank.is_none() {
|
||||
return true;
|
||||
}
|
||||
let bank = bank.as_ref().unwrap().clone();
|
||||
let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank));
|
||||
let slot_leader_pubkey = slot_leader_pubkey.unwrap_or_default();
|
||||
let time = timestamp();
|
||||
for p in &self.partitions {
|
||||
let is_time = (p.start_ts <= time) && (time < p.end_ts);
|
||||
if !is_time {
|
||||
continue;
|
||||
}
|
||||
trace!("PARTITION_TEST partition time! {}", p.my_partition);
|
||||
if p.num_partitions == 0 {
|
||||
continue;
|
||||
}
|
||||
if p.leaders.read().unwrap().is_empty() {
|
||||
let mut leader_vec = p.leaders.write().unwrap();
|
||||
let mut leaders: Vec<Pubkey> = bank.vote_accounts().keys().cloned().collect();
|
||||
leaders.sort();
|
||||
*leader_vec = leaders;
|
||||
warn!("PARTITION_TEST partition enabled {}", p.my_partition);
|
||||
}
|
||||
let is_connected: bool = {
|
||||
let leaders = p.leaders.read().unwrap();
|
||||
let start = p.my_partition * leaders.len() / p.num_partitions;
|
||||
let partition_size = leaders.len() / p.num_partitions;
|
||||
let end = start + partition_size;
|
||||
let end = if leaders.len() - end < partition_size {
|
||||
leaders.len()
|
||||
} else {
|
||||
end
|
||||
};
|
||||
let my_leaders: HashSet<_> = leaders[start..end].iter().collect();
|
||||
my_leaders.contains(&slot_leader_pubkey)
|
||||
};
|
||||
if is_connected {
|
||||
trace!("PARTITION_TEST connected {}", p.my_partition);
|
||||
continue;
|
||||
}
|
||||
trace!("PARTITION_TEST not connected {}", p.my_partition);
|
||||
return false;
|
||||
}
|
||||
trace!("PARTITION_TEST connected");
|
||||
true
|
||||
}
|
||||
}
|
|
@ -3,7 +3,6 @@
|
|||
use crate::{
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
||||
packet::Packets,
|
||||
partition_cfg::PartitionCfg,
|
||||
repair_service::RepairStrategy,
|
||||
result::{Error, Result},
|
||||
streamer::PacketReceiver,
|
||||
|
@ -22,7 +21,7 @@ use solana_sdk::epoch_schedule::EpochSchedule;
|
|||
use std::{
|
||||
cmp,
|
||||
net::UdpSocket,
|
||||
sync::atomic::AtomicBool,
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::mpsc::channel,
|
||||
sync::mpsc::RecvTimeoutError,
|
||||
sync::Mutex,
|
||||
|
@ -213,7 +212,7 @@ impl RetransmitStage {
|
|||
exit: &Arc<AtomicBool>,
|
||||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
epoch_schedule: EpochSchedule,
|
||||
cfg: Option<PartitionCfg>,
|
||||
cfg: Option<Arc<AtomicBool>>,
|
||||
shred_version: u16,
|
||||
) -> Self {
|
||||
let (retransmit_sender, retransmit_receiver) = channel();
|
||||
|
@ -245,7 +244,7 @@ impl RetransmitStage {
|
|||
move |id, shred, working_bank, last_root| {
|
||||
let is_connected = cfg
|
||||
.as_ref()
|
||||
.map(|x| x.is_connected(&working_bank, &leader_schedule_cache, shred))
|
||||
.map(|x| x.load(Ordering::Relaxed))
|
||||
.unwrap_or(true);
|
||||
let rv = should_retransmit_and_persist(
|
||||
shred,
|
||||
|
|
|
@ -6,7 +6,6 @@ use crate::{
|
|||
cluster_info::ClusterInfo,
|
||||
commitment::BlockCommitmentCache,
|
||||
ledger_cleanup_service::LedgerCleanupService,
|
||||
partition_cfg::PartitionCfg,
|
||||
poh_recorder::PohRecorder,
|
||||
replay_stage::{ReplayStage, ReplayStageConfig},
|
||||
retransmit_stage::RetransmitStage,
|
||||
|
@ -84,7 +83,7 @@ impl Tvu {
|
|||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
sigverify_disabled: bool,
|
||||
cfg: Option<PartitionCfg>,
|
||||
cfg: Option<Arc<AtomicBool>>,
|
||||
shred_version: u16,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
) -> Self {
|
||||
|
|
|
@ -6,7 +6,6 @@ use crate::{
|
|||
commitment::BlockCommitmentCache,
|
||||
contact_info::ContactInfo,
|
||||
gossip_service::{discover_cluster, GossipService},
|
||||
partition_cfg::PartitionCfg,
|
||||
poh_recorder::PohRecorder,
|
||||
poh_service::PohService,
|
||||
rpc::JsonRpcConfig,
|
||||
|
@ -66,7 +65,7 @@ pub struct ValidatorConfig {
|
|||
pub snapshot_config: Option<SnapshotConfig>,
|
||||
pub max_ledger_slots: Option<u64>,
|
||||
pub broadcast_stage_type: BroadcastStageType,
|
||||
pub partition_cfg: Option<PartitionCfg>,
|
||||
pub enable_partition: Option<Arc<AtomicBool>>,
|
||||
pub fixed_leader_schedule: Option<FixedSchedule>,
|
||||
pub wait_for_supermajority: bool,
|
||||
pub new_hard_forks: Option<Vec<Slot>>,
|
||||
|
@ -87,7 +86,7 @@ impl Default for ValidatorConfig {
|
|||
rpc_config: JsonRpcConfig::default(),
|
||||
snapshot_config: None,
|
||||
broadcast_stage_type: BroadcastStageType::Standard,
|
||||
partition_cfg: None,
|
||||
enable_partition: None,
|
||||
fixed_leader_schedule: None,
|
||||
wait_for_supermajority: false,
|
||||
new_hard_forks: None,
|
||||
|
@ -370,7 +369,7 @@ impl Validator {
|
|||
completed_slots_receiver,
|
||||
block_commitment_cache,
|
||||
config.dev_sigverify_disabled,
|
||||
config.partition_cfg.clone(),
|
||||
config.enable_partition.clone(),
|
||||
node.info.shred_version,
|
||||
transaction_status_sender.clone(),
|
||||
);
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
use assert_matches::assert_matches;
|
||||
use log::*;
|
||||
use serial_test_derive::serial;
|
||||
use solana_client::rpc_client::RpcClient;
|
||||
use solana_client::thin_client::create_client;
|
||||
use solana_core::{
|
||||
broadcast_stage::BroadcastStageType,
|
||||
consensus::VOTE_THRESHOLD_DEPTH,
|
||||
gossip_service::discover_cluster,
|
||||
partition_cfg::{Partition, PartitionCfg},
|
||||
validator::ValidatorConfig,
|
||||
broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH,
|
||||
gossip_service::discover_cluster, validator::ValidatorConfig,
|
||||
};
|
||||
use solana_ledger::{
|
||||
bank_forks::SnapshotConfig, blockstore::Blockstore, leader_schedule::FixedSchedule,
|
||||
|
@ -18,7 +16,6 @@ use solana_local_cluster::{
|
|||
cluster_tests,
|
||||
local_cluster::{ClusterConfig, LocalCluster},
|
||||
};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_sdk::{
|
||||
client::SyncClient,
|
||||
clock,
|
||||
|
@ -28,6 +25,7 @@ use solana_sdk::{
|
|||
poh_config::PohConfig,
|
||||
signature::{Keypair, KeypairUtil},
|
||||
};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fs, iter,
|
||||
|
@ -248,7 +246,7 @@ fn run_cluster_partition(
|
|||
};
|
||||
|
||||
let validator_pubkeys: Vec<_> = validator_keys.iter().map(|v| v.pubkey()).collect();
|
||||
let mut config = ClusterConfig {
|
||||
let config = ClusterConfig {
|
||||
cluster_lamports,
|
||||
node_stakes,
|
||||
validator_configs: vec![validator_config.clone(); num_nodes],
|
||||
|
@ -256,71 +254,65 @@ fn run_cluster_partition(
|
|||
..ClusterConfig::default()
|
||||
};
|
||||
|
||||
let now = timestamp();
|
||||
// Partition needs to start after the first few shorter warmup epochs, otherwise
|
||||
// no root will be set before the partition is resolved, the leader schedule will
|
||||
// not be computable, and the cluster wll halt.
|
||||
let partition_epoch_start_offset = cluster_tests::time_until_nth_epoch(
|
||||
partition_start_epoch,
|
||||
config.slots_per_epoch,
|
||||
config.stakers_slot_offset,
|
||||
);
|
||||
// Assume it takes <= 10 seconds for `LocalCluster::new` to boot up.
|
||||
let local_cluster_boot_time = 10_000;
|
||||
let partition_start = now + partition_epoch_start_offset + local_cluster_boot_time;
|
||||
let partition_end = partition_start + leader_schedule_time as u64;
|
||||
let mut validator_index = 0;
|
||||
for (i, partition) in partitions.iter().enumerate() {
|
||||
for _ in partition.iter() {
|
||||
let mut p1 = Partition::default();
|
||||
p1.num_partitions = partitions.len();
|
||||
p1.my_partition = i;
|
||||
p1.start_ts = partition_start;
|
||||
p1.end_ts = partition_end;
|
||||
config.validator_configs[validator_index].partition_cfg =
|
||||
Some(PartitionCfg::new(vec![p1]));
|
||||
validator_index += 1;
|
||||
}
|
||||
}
|
||||
let enable_partition = Some(Arc::new(AtomicBool::new(true)));
|
||||
info!(
|
||||
"PARTITION_TEST starting cluster with {:?} partitions",
|
||||
partitions
|
||||
"PARTITION_TEST starting cluster with {:?} partitions slots_per_epoch: {}",
|
||||
partitions, config.slots_per_epoch,
|
||||
);
|
||||
let now = Instant::now();
|
||||
let mut cluster = LocalCluster::new(&config);
|
||||
let elapsed = now.elapsed();
|
||||
assert!(elapsed.as_millis() < local_cluster_boot_time as u128);
|
||||
|
||||
let now = timestamp();
|
||||
let timeout = partition_start as u64 - now as u64;
|
||||
info!(
|
||||
"PARTITION_TEST sleeping until partition start timeout {}",
|
||||
timeout
|
||||
);
|
||||
let mut dead_nodes = HashSet::new();
|
||||
if timeout > 0 {
|
||||
sleep(Duration::from_millis(timeout as u64));
|
||||
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, num_nodes).unwrap();
|
||||
|
||||
info!("PARTITION_TEST sleeping until partition starting condition",);
|
||||
loop {
|
||||
let mut reached_epoch = true;
|
||||
for node in &cluster_nodes {
|
||||
let node_client = RpcClient::new_socket(node.rpc);
|
||||
if let Ok(epoch_info) = node_client.get_epoch_info() {
|
||||
info!("slots_per_epoch: {:?}", epoch_info);
|
||||
if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) {
|
||||
reached_epoch = false;
|
||||
break;
|
||||
}
|
||||
info!("PARTITION_TEST done sleeping until partition start timeout");
|
||||
let now = timestamp();
|
||||
let timeout = partition_end as u64 - now as u64;
|
||||
info!(
|
||||
"PARTITION_TEST sleeping until partition end timeout {}",
|
||||
timeout
|
||||
);
|
||||
} else {
|
||||
reached_epoch = false;
|
||||
}
|
||||
}
|
||||
|
||||
if reached_epoch {
|
||||
info!("PARTITION_TEST start partition");
|
||||
enable_partition
|
||||
.clone()
|
||||
.unwrap()
|
||||
.store(false, Ordering::Relaxed);
|
||||
break;
|
||||
} else {
|
||||
sleep(Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
sleep(Duration::from_millis(leader_schedule_time));
|
||||
|
||||
info!("PARTITION_TEST remove partition");
|
||||
enable_partition.unwrap().store(true, Ordering::Relaxed);
|
||||
|
||||
let mut dead_nodes = HashSet::new();
|
||||
let mut alive_node_contact_infos = vec![];
|
||||
let should_exits: Vec<_> = partitions
|
||||
.iter()
|
||||
.flat_map(|p| p.iter().map(|(_, should_exit)| should_exit))
|
||||
.collect();
|
||||
assert_eq!(should_exits.len(), validator_pubkeys.len());
|
||||
let timeout = 10;
|
||||
if timeout > 0 {
|
||||
// Give partitions time to propagate their blocks from durinig the partition
|
||||
// Give partitions time to propagate their blocks from during the partition
|
||||
// after the partition resolves
|
||||
let propagation_time = leader_schedule_time;
|
||||
info!("PARTITION_TEST resolving partition");
|
||||
sleep(Duration::from_millis(timeout));
|
||||
info!("PARTITION_TEST waiting for blocks to propagate after partition");
|
||||
info!("PARTITION_TEST resolving partition. sleeping {}ms", timeout);
|
||||
sleep(Duration::from_millis(10_000));
|
||||
info!(
|
||||
"PARTITION_TEST waiting for blocks to propagate after partition {}ms",
|
||||
propagation_time
|
||||
);
|
||||
sleep(Duration::from_millis(propagation_time));
|
||||
info!("PARTITION_TEST resuming normal operation");
|
||||
for (pubkey, should_exit) in validator_pubkeys.iter().zip(should_exits) {
|
||||
|
@ -353,6 +345,7 @@ fn run_cluster_partition(
|
|||
info!("PARTITION_TEST looking for new roots on all nodes");
|
||||
let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()];
|
||||
let mut done = false;
|
||||
let mut last_print = Instant::now();
|
||||
while !done {
|
||||
for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() {
|
||||
let client = create_client(
|
||||
|
@ -362,12 +355,15 @@ fn run_cluster_partition(
|
|||
let slot = client.get_slot().unwrap_or(0);
|
||||
roots[i].insert(slot);
|
||||
let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0);
|
||||
if last_print.elapsed().as_secs() > 3 {
|
||||
info!("PARTITION_TEST min observed roots {}/16", min_node);
|
||||
last_print = Instant::now();
|
||||
}
|
||||
done = min_node >= 16;
|
||||
}
|
||||
sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2));
|
||||
}
|
||||
info!("PARTITION_TEST done spending on all node");
|
||||
info!("PARTITION_TEST done waiting for roots");
|
||||
}
|
||||
|
||||
#[allow(unused_attributes)]
|
||||
|
@ -424,6 +420,7 @@ fn test_kill_partition() {
|
|||
leader_schedule.push(k.pubkey())
|
||||
}
|
||||
}
|
||||
info!("leader_schedule: {}", leader_schedule.len());
|
||||
|
||||
run_cluster_partition(
|
||||
&partitions,
|
||||
|
|
Loading…
Reference in New Issue