diff --git a/Cargo.lock b/Cargo.lock index cc42e6da0..72e7cfd22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7608,13 +7608,17 @@ dependencies = [ name = "solana-wen-restart" version = "1.19.0" dependencies = [ + "anyhow", + "assert_matches", "log", "prost", "prost-build", "prost-types", "protobuf-src", + "rand 0.8.5", "rustc_version 0.4.0", "serial_test", + "solana-accounts-db", "solana-entry", "solana-gossip", "solana-ledger", @@ -7624,6 +7628,7 @@ dependencies = [ "solana-sdk", "solana-streamer", "solana-vote-program", + "tempfile", ] [[package]] diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index 87b1f49bc..8f455cbd6 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -1376,6 +1376,7 @@ mod test { ancestor_duplicate_slots_sender, repair_validators: None, repair_whitelist, + wen_restart_repair_slots: None, }; let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = diff --git a/core/src/repair/repair_generic_traversal.rs b/core/src/repair/repair_generic_traversal.rs index f33a9b91e..c4b573620 100644 --- a/core/src/repair/repair_generic_traversal.rs +++ b/core/src/repair/repair_generic_traversal.rs @@ -186,7 +186,7 @@ pub fn get_closest_completion( continue; } let slot_meta = slot_meta_cache.get(&path_slot).unwrap().as_ref().unwrap(); - let new_repairs = RepairService::generate_repairs_for_slot( + let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick( blockstore, path_slot, slot_meta, diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index f695f7b60..eb516fc74 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -224,6 +224,8 @@ pub struct RepairInfo { pub repair_validators: Option>, // Validators which should be given priority when serving pub repair_whitelist: Arc>>, + // A given list of slots to repair when in wen_restart + pub wen_restart_repair_slots: Option>>>, } pub struct RepairSlotRange { @@ -397,17 +399,24 @@ impl RepairService { ); add_votes_elapsed.stop(); - let repairs = repair_weight.get_best_weighted_repairs( - blockstore, - root_bank.epoch_stakes_map(), - root_bank.epoch_schedule(), - MAX_ORPHANS, - MAX_REPAIR_LENGTH, - MAX_UNKNOWN_LAST_INDEX_REPAIRS, - MAX_CLOSEST_COMPLETION_REPAIRS, - &mut repair_timing, - &mut best_repairs_stats, - ); + let repairs = match repair_info.wen_restart_repair_slots.clone() { + Some(slots_to_repair) => Self::generate_repairs_for_wen_restart( + blockstore, + MAX_REPAIR_LENGTH, + &slots_to_repair.read().unwrap(), + ), + None => repair_weight.get_best_weighted_repairs( + blockstore, + root_bank.epoch_stakes_map(), + root_bank.epoch_schedule(), + MAX_ORPHANS, + MAX_REPAIR_LENGTH, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, + &mut repair_timing, + &mut best_repairs_stats, + ), + }; let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks( root_bank.epoch_stakes_map(), @@ -618,32 +627,58 @@ impl RepairService { } } - /// If this slot is missing shreds generate repairs - pub fn generate_repairs_for_slot( + pub fn generate_repairs_for_slot_throttled_by_tick( blockstore: &Blockstore, slot: Slot, slot_meta: &SlotMeta, max_repairs: usize, ) -> Vec { + Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true) + } + + pub fn generate_repairs_for_slot_not_throttled_by_tick( + blockstore: &Blockstore, + slot: Slot, + slot_meta: &SlotMeta, + max_repairs: usize, + ) -> Vec { + Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false) + } + + /// If this slot is missing shreds generate repairs + fn generate_repairs_for_slot( + blockstore: &Blockstore, + slot: Slot, + slot_meta: &SlotMeta, + max_repairs: usize, + throttle_requests_by_shred_tick: bool, + ) -> Vec { + let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick { + DEFER_REPAIR_THRESHOLD_TICKS + } else { + 0 + }; if max_repairs == 0 || slot_meta.is_full() { vec![] } else if slot_meta.consumed == slot_meta.received { - // check delay time of last shred - if let Some(reference_tick) = slot_meta - .received - .checked_sub(1) - .and_then(|index| blockstore.get_data_shred(slot, index).ok()?) - .and_then(|shred| shred::layout::get_reference_tick(&shred).ok()) - .map(u64::from) - { - // System time is not monotonic - let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND - * timestamp().saturating_sub(slot_meta.first_shred_timestamp) - / 1_000; - if ticks_since_first_insert - < reference_tick.saturating_add(DEFER_REPAIR_THRESHOLD_TICKS) + if throttle_requests_by_shred_tick { + // check delay time of last shred + if let Some(reference_tick) = slot_meta + .received + .checked_sub(1) + .and_then(|index| blockstore.get_data_shred(slot, index).ok()?) + .and_then(|shred| shred::layout::get_reference_tick(&shred).ok()) + .map(u64::from) { - return vec![]; + // System time is not monotonic + let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND + * timestamp().saturating_sub(slot_meta.first_shred_timestamp) + / 1_000; + if ticks_since_first_insert + < reference_tick.saturating_add(defer_repair_threshold_ticks) + { + return vec![]; + } } } vec![ShredRepairType::HighestShred(slot, slot_meta.received)] @@ -652,7 +687,7 @@ impl RepairService { .find_missing_data_indexes( slot, slot_meta.first_shred_timestamp, - DEFER_REPAIR_THRESHOLD_TICKS, + defer_repair_threshold_ticks, slot_meta.consumed, slot_meta.received, max_repairs, @@ -674,7 +709,7 @@ impl RepairService { while repairs.len() < max_repairs && !pending_slots.is_empty() { let slot = pending_slots.pop().unwrap(); if let Some(slot_meta) = blockstore.meta(slot).unwrap() { - let new_repairs = Self::generate_repairs_for_slot( + let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick( blockstore, slot, &slot_meta, @@ -689,6 +724,33 @@ impl RepairService { } } + pub(crate) fn generate_repairs_for_wen_restart( + blockstore: &Blockstore, + max_repairs: usize, + slots: &Vec, + ) -> Vec { + let mut repairs: Vec = Vec::new(); + for slot in slots { + if let Some(slot_meta) = blockstore.meta(*slot).unwrap() { + // When in wen_restart, turbine is not running, so there is + // no need to wait after first shred. + let new_repairs = Self::generate_repairs_for_slot_not_throttled_by_tick( + blockstore, + *slot, + &slot_meta, + max_repairs - repairs.len(), + ); + repairs.extend(new_repairs); + } else { + repairs.push(ShredRepairType::HighestShred(*slot, 0)); + } + if repairs.len() >= max_repairs { + break; + } + } + repairs + } + fn get_repair_peers( cluster_info: Arc, cluster_slots: Arc, @@ -845,7 +907,7 @@ impl RepairService { ..SlotMeta::default() }); - let new_repairs = Self::generate_repairs_for_slot( + let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick( blockstore, slot, &meta, @@ -867,7 +929,7 @@ impl RepairService { // If the slot is full, no further need to repair this slot None } else { - Some(Self::generate_repairs_for_slot( + Some(Self::generate_repairs_for_slot_throttled_by_tick( blockstore, slot, &slot_meta, @@ -1548,4 +1610,63 @@ mod test { ); assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr); } + + #[test] + fn test_generate_repairs_for_wen_restart() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let max_repairs = 3; + + let slots: Vec = vec![2, 3, 5, 7]; + let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1; + + let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); + for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() { + slot_shreds.remove(i); + blockstore.insert_shreds(slot_shreds, None, false).unwrap(); + } + + let mut slots_to_repair: Vec = vec![]; + + // When slots_to_repair is empty, ignore all and return empty result. + let result = RepairService::generate_repairs_for_wen_restart( + &blockstore, + max_repairs, + &slots_to_repair, + ); + assert!(result.is_empty()); + + // When asked to repair slot with missing shreds and some unknown slot, return correct results. + slots_to_repair = vec![3, 81]; + let result = RepairService::generate_repairs_for_wen_restart( + &blockstore, + max_repairs, + &slots_to_repair, + ); + assert_eq!( + result, + vec![ + ShredRepairType::Shred(3, 1), + ShredRepairType::HighestShred(81, 0), + ], + ); + + // Test that it will not generate more than max_repairs.e().unwrap(); + slots_to_repair = vec![2, 82, 7, 83, 84]; + let result = RepairService::generate_repairs_for_wen_restart( + &blockstore, + max_repairs, + &slots_to_repair, + ); + assert_eq!(result.len(), max_repairs); + assert_eq!( + result, + vec![ + ShredRepairType::Shred(2, 0), + ShredRepairType::HighestShred(82, 0), + ShredRepairType::HighestShred(7, 3), + ], + ); + } } diff --git a/core/src/repair/repair_weighted_traversal.rs b/core/src/repair/repair_weighted_traversal.rs index 38682a3fd..175b3268e 100644 --- a/core/src/repair/repair_weighted_traversal.rs +++ b/core/src/repair/repair_weighted_traversal.rs @@ -98,7 +98,7 @@ pub fn get_best_repair_shreds( if let Some(slot_meta) = slot_meta { match next { Visit::Unvisited(slot) => { - let new_repairs = RepairService::generate_repairs_for_slot( + let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick( blockstore, slot, slot_meta, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b0fe93890..47bc9a790 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -143,6 +143,7 @@ impl Tvu { repair_quic_endpoint_sender: AsyncSender, outstanding_repair_requests: Arc>, cluster_slots: Arc, + wen_restart_repair_slots: Option>>>, ) -> Result { let TvuSockets { repair: repair_socket, @@ -214,6 +215,7 @@ impl Tvu { repair_whitelist: tvu_config.repair_whitelist, cluster_info: cluster_info.clone(), cluster_slots: cluster_slots.clone(), + wen_restart_repair_slots, }; WindowService::new( blockstore.clone(), @@ -499,6 +501,7 @@ pub mod tests { repair_quic_endpoint_sender, outstanding_repair_requests, cluster_slots, + None, ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 97ef0a01e..a6d5921bc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -138,6 +138,11 @@ use { const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; +// Right now since we reuse the wait for supermajority code, the +// following threshold should always greater than or equal to +// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT. +const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = + WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT; #[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)] #[strum(serialize_all = "kebab-case")] @@ -1236,6 +1241,11 @@ impl Validator { }; let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority; + let wen_restart_repair_slots = if in_wen_restart { + Some(Arc::new(RwLock::new(Vec::new()))) + } else { + None + }; let tower = match process_blockstore.process_to_create_tower() { Ok(tower) => { info!("Tower state: {:?}", tower); @@ -1310,6 +1320,7 @@ impl Validator { repair_quic_endpoint_sender, outstanding_repair_requests.clone(), cluster_slots.clone(), + wen_restart_repair_slots.clone(), )?; if in_wen_restart { @@ -1319,6 +1330,10 @@ impl Validator { last_vote, blockstore.clone(), cluster_info.clone(), + bank_forks.clone(), + wen_restart_repair_slots.clone(), + WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT, + exit.clone(), ) { Ok(()) => { return Err("wen_restart phase one completedy".to_string()); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 504776db1..7d939eeea 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -248,10 +248,11 @@ fn verify_repair( .unwrap_or(true) } -fn prune_shreds_invalid_repair( +fn prune_shreds_by_repair_status( shreds: &mut Vec, repair_infos: &mut Vec>, outstanding_requests: &RwLock, + accept_repairs_only: bool, ) { assert_eq!(shreds.len(), repair_infos.len()); let mut i = 0; @@ -260,7 +261,8 @@ fn prune_shreds_invalid_repair( let mut outstanding_requests = outstanding_requests.write().unwrap(); shreds.retain(|shred| { let should_keep = ( - verify_repair(&mut outstanding_requests, shred, &repair_infos[i]), + (!accept_repairs_only || repair_infos[i].is_some()) + && verify_repair(&mut outstanding_requests, shred, &repair_infos[i]), i += 1, ) .0; @@ -288,6 +290,7 @@ fn run_insert( retransmit_sender: &Sender>, outstanding_requests: &RwLock, reed_solomon_cache: &ReedSolomonCache, + accept_repairs_only: bool, ) -> Result<()> where F: Fn(PossibleDuplicateShred), @@ -333,7 +336,12 @@ where let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); let num_shreds = shreds.len(); - prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); + prune_shreds_by_repair_status( + &mut shreds, + &mut repair_infos, + outstanding_requests, + accept_repairs_only, + ); ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len(); let repairs: Vec<_> = repair_infos .iter() @@ -391,6 +399,10 @@ impl WindowService { let cluster_info = repair_info.cluster_info.clone(); let bank_forks = repair_info.bank_forks.clone(); + // In wen_restart, we discard all shreds from Turbine and keep only those from repair to + // avoid new shreds make validator OOM before wen_restart is over. + let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some(); + let repair_service = RepairService::new( blockstore.clone(), exit.clone(), @@ -426,6 +438,7 @@ impl WindowService { completed_data_sets_sender, retransmit_sender, outstanding_repair_requests, + accept_repairs_only, ); WindowService { @@ -475,6 +488,7 @@ impl WindowService { completed_data_sets_sender: CompletedDataSetsSender, retransmit_sender: Sender>, outstanding_requests: Arc>, + accept_repairs_only: bool, ) -> JoinHandle<()> { let handle_error = || { inc_new_counter_error!("solana-window-insert-error", 1, 1); @@ -507,6 +521,7 @@ impl WindowService { &retransmit_sender, &outstanding_requests, &reed_solomon_cache, + accept_repairs_only, ) { ws_metrics.record_error(&e); if Self::should_exit_on_error(e, &handle_error) { @@ -743,7 +758,7 @@ mod test { 4, // position 0, // version ); - let mut shreds = vec![shred.clone(), shred.clone(), shred]; + let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()]; let repair_meta = RepairMeta { nonce: 0 }; let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); let repair_type = ShredRepairType::Orphan(9); @@ -753,9 +768,21 @@ mod test { .add_request(repair_type, timestamp()); let repair_meta1 = RepairMeta { nonce }; let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)]; - prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests); + prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, false); + assert_eq!(shreds.len(), 2); assert_eq!(repair_infos.len(), 2); assert!(repair_infos[0].is_none()); assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce); + + shreds = vec![shred.clone(), shred.clone(), shred]; + let repair_meta2 = RepairMeta { nonce: 0 }; + let repair_meta3 = RepairMeta { nonce }; + repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)]; + // In wen_restart, we discard all Turbine shreds and only keep valid repair shreds. + prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, true); + assert_eq!(shreds.len(), 1); + assert_eq!(repair_infos.len(), 1); + assert!(repair_infos[0].is_some()); + assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce); } } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 93e2a243e..0929453a9 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6581,6 +6581,7 @@ dependencies = [ name = "solana-wen-restart" version = "1.19.0" dependencies = [ + "anyhow", "log", "prost", "prost-build", diff --git a/wen-restart/Cargo.toml b/wen-restart/Cargo.toml index a2e6e5c1a..add4340cd 100644 --- a/wen-restart/Cargo.toml +++ b/wen-restart/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } publish = true [dependencies] +anyhow = { workspace = true } log = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } @@ -23,9 +24,14 @@ solana-sdk = { workspace = true } solana-vote-program = { workspace = true } [dev-dependencies] +assert_matches = { workspace = true } +rand = { workspace = true } serial_test = { workspace = true } +solana-accounts-db = { workspace = true } solana-entry = { workspace = true } +solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-streamer = { workspace = true } +tempfile = { workspace = true } [build-dependencies] prost-build = { workspace = true } diff --git a/wen-restart/proto/wen_restart.proto b/wen-restart/proto/wen_restart.proto index 1f6423462..b25c2f177 100644 --- a/wen-restart/proto/wen_restart.proto +++ b/wen-restart/proto/wen_restart.proto @@ -11,13 +11,19 @@ enum State { DONE = 6; } -message MyLastVotedForkSlots { - uint64 last_vote_slot = 1; +message LastVotedForkSlotsRecord { + repeated uint64 last_voted_fork_slots = 1; string last_vote_bankhash = 2; uint32 shred_version = 3; + uint64 wallclock = 4; +} + +message LastVotedForkSlotsAggregateRecord { + map received = 1; } message WenRestartProgress { State state = 1; - optional MyLastVotedForkSlots my_last_voted_fork_slots = 2; + optional LastVotedForkSlotsRecord my_last_voted_fork_slots = 2; + optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3; } \ No newline at end of file diff --git a/wen-restart/src/last_voted_fork_slots_aggregate.rs b/wen-restart/src/last_voted_fork_slots_aggregate.rs new file mode 100644 index 000000000..8a26c4d31 --- /dev/null +++ b/wen-restart/src/last_voted_fork_slots_aggregate.rs @@ -0,0 +1,487 @@ +use { + crate::solana::wen_restart_proto::LastVotedForkSlotsRecord, + anyhow::Result, + log::*, + solana_gossip::restart_crds_values::RestartLastVotedForkSlots, + solana_runtime::epoch_stakes::EpochStakes, + solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}, + std::{ + collections::{HashMap, HashSet}, + str::FromStr, + }, +}; + +pub struct LastVotedForkSlotsAggregate { + root_slot: Slot, + repair_threshold: f64, + // TODO(wen): using local root's EpochStakes, need to fix if crossing Epoch boundary. + epoch_stakes: EpochStakes, + last_voted_fork_slots: HashMap, + slots_stake_map: HashMap, + active_peers: HashSet, + slots_to_repair: HashSet, +} + +pub struct LastVotedForkSlotsAggregateResult { + pub slots_to_repair: Vec, + pub active_percent: f64, /* 0 ~ 100.0 */ +} + +impl LastVotedForkSlotsAggregate { + pub(crate) fn new( + root_slot: Slot, + repair_threshold: f64, + epoch_stakes: &EpochStakes, + last_voted_fork_slots: &Vec, + my_pubkey: &Pubkey, + ) -> Self { + let mut active_peers = HashSet::new(); + let sender_stake = Self::validator_stake(epoch_stakes, my_pubkey); + active_peers.insert(*my_pubkey); + let mut slots_stake_map = HashMap::new(); + for slot in last_voted_fork_slots { + if slot > &root_slot { + slots_stake_map.insert(*slot, sender_stake); + } + } + Self { + root_slot, + repair_threshold, + epoch_stakes: epoch_stakes.clone(), + last_voted_fork_slots: HashMap::new(), + slots_stake_map, + active_peers, + slots_to_repair: HashSet::new(), + } + } + + fn validator_stake(epoch_stakes: &EpochStakes, pubkey: &Pubkey) -> u64 { + epoch_stakes + .node_id_to_vote_accounts() + .get(pubkey) + .map(|x| x.total_stake) + .unwrap_or_default() + } + + pub(crate) fn aggregate_from_record( + &mut self, + key_string: &str, + record: &LastVotedForkSlotsRecord, + ) -> Result> { + let from = Pubkey::from_str(key_string)?; + let last_voted_hash = Hash::from_str(&record.last_vote_bankhash)?; + let converted_record = RestartLastVotedForkSlots::new( + from, + record.wallclock, + &record.last_voted_fork_slots, + last_voted_hash, + record.shred_version as u16, + )?; + Ok(self.aggregate(converted_record)) + } + + pub(crate) fn aggregate( + &mut self, + new_slots: RestartLastVotedForkSlots, + ) -> Option { + let total_stake = self.epoch_stakes.total_stake(); + let threshold_stake = (total_stake as f64 * self.repair_threshold) as u64; + let from = &new_slots.from; + let sender_stake = Self::validator_stake(&self.epoch_stakes, from); + if sender_stake == 0 { + warn!( + "Gossip should not accept zero-stake RestartLastVotedFork from {:?}", + from + ); + return None; + } + self.active_peers.insert(*from); + let new_slots_vec = new_slots.to_slots(self.root_slot); + let record = LastVotedForkSlotsRecord { + last_voted_fork_slots: new_slots_vec.clone(), + last_vote_bankhash: new_slots.last_voted_hash.to_string(), + shred_version: new_slots.shred_version as u32, + wallclock: new_slots.wallclock, + }; + let new_slots_set: HashSet = HashSet::from_iter(new_slots_vec); + let old_slots_set = match self.last_voted_fork_slots.insert(*from, new_slots.clone()) { + Some(old_slots) => { + if old_slots == new_slots { + return None; + } else { + HashSet::from_iter(old_slots.to_slots(self.root_slot)) + } + } + None => HashSet::new(), + }; + for slot in old_slots_set.difference(&new_slots_set) { + let entry = self.slots_stake_map.get_mut(slot).unwrap(); + *entry = entry.saturating_sub(sender_stake); + if *entry < threshold_stake { + self.slots_to_repair.remove(slot); + } + } + for slot in new_slots_set.difference(&old_slots_set) { + let entry = self.slots_stake_map.entry(*slot).or_insert(0); + *entry = entry.saturating_add(sender_stake); + if *entry >= threshold_stake { + self.slots_to_repair.insert(*slot); + } + } + Some(record) + } + + pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult { + let total_stake = self.epoch_stakes.total_stake(); + let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| { + sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey)) + }); + let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0; + LastVotedForkSlotsAggregateResult { + slots_to_repair: self.slots_to_repair.iter().cloned().collect(), + active_percent, + } + } +} + +#[cfg(test)] +mod tests { + use { + crate::{ + last_voted_fork_slots_aggregate::LastVotedForkSlotsAggregate, + solana::wen_restart_proto::LastVotedForkSlotsRecord, + }, + solana_gossip::restart_crds_values::RestartLastVotedForkSlots, + solana_program::{clock::Slot, pubkey::Pubkey}, + solana_runtime::{ + bank::Bank, + genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }, + }, + solana_sdk::{hash::Hash, signature::Signer, timing::timestamp}, + }; + + const TOTAL_VALIDATOR_COUNT: u16 = 10; + const MY_INDEX: usize = 9; + const REPAIR_THRESHOLD: f64 = 0.42; + const SHRED_VERSION: u16 = 52; + + struct TestAggregateInitResult { + pub slots_aggregate: LastVotedForkSlotsAggregate, + pub validator_voting_keypairs: Vec, + pub root_slot: Slot, + pub last_voted_fork_slots: Vec, + } + + fn test_aggregate_init() -> TestAggregateInitResult { + solana_logger::setup(); + let validator_voting_keypairs: Vec<_> = (0..TOTAL_VALIDATOR_COUNT) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![100; validator_voting_keypairs.len()], + ); + let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); + let root_bank = bank_forks.read().unwrap().root_bank(); + let root_slot = root_bank.slot(); + let last_voted_fork_slots = vec![ + root_slot.saturating_add(1), + root_slot.saturating_add(2), + root_slot.saturating_add(3), + ]; + TestAggregateInitResult { + slots_aggregate: LastVotedForkSlotsAggregate::new( + root_slot, + REPAIR_THRESHOLD, + root_bank.epoch_stakes(root_bank.epoch()).unwrap(), + &last_voted_fork_slots, + &validator_voting_keypairs[MY_INDEX].node_keypair.pubkey(), + ), + validator_voting_keypairs, + root_slot, + last_voted_fork_slots, + } + } + + #[test] + fn test_aggregate() { + let mut test_state = test_aggregate_init(); + let root_slot = test_state.root_slot; + let initial_num_active_validators = 3; + for validator_voting_keypair in test_state + .validator_voting_keypairs + .iter() + .take(initial_num_active_validators) + { + let pubkey = validator_voting_keypair.node_keypair.pubkey(); + let now = timestamp(); + assert_eq!( + test_state.slots_aggregate.aggregate( + RestartLastVotedForkSlots::new( + pubkey, + now, + &test_state.last_voted_fork_slots, + Hash::default(), + SHRED_VERSION, + ) + .unwrap(), + ), + Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: Hash::default().to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: now, + }), + ); + } + let result = test_state.slots_aggregate.get_aggregate_result(); + let mut expected_active_percent = + (initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0; + assert_eq!(result.active_percent, expected_active_percent); + assert!(result.slots_to_repair.is_empty()); + + let new_active_validator = test_state.validator_voting_keypairs + [initial_num_active_validators + 1] + .node_keypair + .pubkey(); + let now = timestamp(); + let new_active_validator_last_voted_slots = RestartLastVotedForkSlots::new( + new_active_validator, + now, + &test_state.last_voted_fork_slots, + Hash::default(), + SHRED_VERSION, + ) + .unwrap(); + assert_eq!( + test_state + .slots_aggregate + .aggregate(new_active_validator_last_voted_slots), + Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: Hash::default().to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: now, + }), + ); + let result = test_state.slots_aggregate.get_aggregate_result(); + expected_active_percent = + (initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0; + assert_eq!(result.active_percent, expected_active_percent); + let mut actual_slots = Vec::from_iter(result.slots_to_repair); + actual_slots.sort(); + assert_eq!(actual_slots, test_state.last_voted_fork_slots); + + let replace_message_validator = test_state.validator_voting_keypairs[2] + .node_keypair + .pubkey(); + // Allow specific validator to replace message. + let now = timestamp(); + let replace_message_validator_last_fork = RestartLastVotedForkSlots::new( + replace_message_validator, + now, + &[root_slot + 1, root_slot + 4, root_slot + 5], + Hash::default(), + SHRED_VERSION, + ) + .unwrap(); + assert_eq!( + test_state + .slots_aggregate + .aggregate(replace_message_validator_last_fork), + Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: vec![root_slot + 1, root_slot + 4, root_slot + 5], + last_vote_bankhash: Hash::default().to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: now, + }), + ); + let result = test_state.slots_aggregate.get_aggregate_result(); + assert_eq!(result.active_percent, expected_active_percent); + let mut actual_slots = Vec::from_iter(result.slots_to_repair); + actual_slots.sort(); + assert_eq!(actual_slots, vec![root_slot + 1]); + + // test that zero stake validator is ignored. + let random_pubkey = Pubkey::new_unique(); + assert_eq!( + test_state.slots_aggregate.aggregate( + RestartLastVotedForkSlots::new( + random_pubkey, + timestamp(), + &[root_slot + 1, root_slot + 4, root_slot + 5], + Hash::default(), + SHRED_VERSION, + ) + .unwrap(), + ), + None, + ); + let result = test_state.slots_aggregate.get_aggregate_result(); + assert_eq!(result.active_percent, expected_active_percent); + let mut actual_slots = Vec::from_iter(result.slots_to_repair); + actual_slots.sort(); + assert_eq!(actual_slots, vec![root_slot + 1]); + } + + #[test] + fn test_aggregate_from_record() { + let mut test_state = test_aggregate_init(); + let root_slot = test_state.root_slot; + let last_vote_bankhash = Hash::new_unique(); + let time1 = timestamp(); + let record = LastVotedForkSlotsRecord { + wallclock: time1, + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + }; + let result = test_state.slots_aggregate.get_aggregate_result(); + assert_eq!(result.active_percent, 10.0); + assert_eq!( + test_state + .slots_aggregate + .aggregate_from_record( + &test_state.validator_voting_keypairs[0] + .node_keypair + .pubkey() + .to_string(), + &record, + ) + .unwrap(), + Some(record.clone()), + ); + let result = test_state.slots_aggregate.get_aggregate_result(); + assert_eq!(result.active_percent, 20.0); + // Now if you get the same result from Gossip again, it should be ignored. + assert_eq!( + test_state.slots_aggregate.aggregate( + RestartLastVotedForkSlots::new( + test_state.validator_voting_keypairs[0] + .node_keypair + .pubkey(), + time1, + &test_state.last_voted_fork_slots, + last_vote_bankhash, + SHRED_VERSION, + ) + .unwrap(), + ), + None, + ); + + // But if it's a new record from the same validator, it will be replaced. + let time2 = timestamp(); + let last_voted_fork_slots2 = + vec![root_slot + 1, root_slot + 2, root_slot + 3, root_slot + 4]; + let last_vote_bankhash2 = Hash::new_unique(); + assert_eq!( + test_state.slots_aggregate.aggregate( + RestartLastVotedForkSlots::new( + test_state.validator_voting_keypairs[0] + .node_keypair + .pubkey(), + time2, + &last_voted_fork_slots2, + last_vote_bankhash2, + SHRED_VERSION, + ) + .unwrap(), + ), + Some(LastVotedForkSlotsRecord { + wallclock: time2, + last_voted_fork_slots: last_voted_fork_slots2.clone(), + last_vote_bankhash: last_vote_bankhash2.to_string(), + shred_version: SHRED_VERSION as u32, + }), + ); + // percentage doesn't change since it's a replace. + let result = test_state.slots_aggregate.get_aggregate_result(); + assert_eq!(result.active_percent, 20.0); + + // Record from validator with zero stake should be ignored. + assert_eq!( + test_state + .slots_aggregate + .aggregate_from_record( + &Pubkey::new_unique().to_string(), + &LastVotedForkSlotsRecord { + wallclock: timestamp(), + last_voted_fork_slots: vec![root_slot + 10, root_slot + 300], + last_vote_bankhash: Hash::new_unique().to_string(), + shred_version: SHRED_VERSION as u32, + } + ) + .unwrap(), + None, + ); + // percentage doesn't change since the previous aggregate is ignored. + let result = test_state.slots_aggregate.get_aggregate_result(); + assert_eq!(result.active_percent, 20.0); + } + + #[test] + fn test_aggregate_from_record_failures() { + solana_logger::setup(); + let mut test_state = test_aggregate_init(); + let last_vote_bankhash = Hash::new_unique(); + let mut last_voted_fork_slots_record = LastVotedForkSlotsRecord { + wallclock: timestamp(), + last_voted_fork_slots: test_state.last_voted_fork_slots, + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + }; + // First test that this is a valid record. + assert_eq!( + test_state + .slots_aggregate + .aggregate_from_record( + &test_state.validator_voting_keypairs[0] + .node_keypair + .pubkey() + .to_string(), + &last_voted_fork_slots_record, + ) + .unwrap(), + Some(last_voted_fork_slots_record.clone()), + ); + // Then test that it fails if the record is invalid. + + // Invalid pubkey. + assert!(test_state + .slots_aggregate + .aggregate_from_record("invalid_pubkey", &last_voted_fork_slots_record,) + .is_err()); + + // Invalid hash. + last_voted_fork_slots_record.last_vote_bankhash.clear(); + assert!(test_state + .slots_aggregate + .aggregate_from_record( + &test_state.validator_voting_keypairs[0] + .node_keypair + .pubkey() + .to_string(), + &last_voted_fork_slots_record, + ) + .is_err()); + last_voted_fork_slots_record.last_vote_bankhash.pop(); + + // Empty last voted fork. + last_voted_fork_slots_record.last_vote_bankhash = last_vote_bankhash.to_string(); + last_voted_fork_slots_record.last_voted_fork_slots.clear(); + assert!(test_state + .slots_aggregate + .aggregate_from_record( + &test_state.validator_voting_keypairs[0] + .node_keypair + .pubkey() + .to_string(), + &last_voted_fork_slots_record, + ) + .is_err()); + } +} diff --git a/wen-restart/src/lib.rs b/wen-restart/src/lib.rs index e58a6d04b..d389136bb 100644 --- a/wen-restart/src/lib.rs +++ b/wen-restart/src/lib.rs @@ -4,4 +4,5 @@ pub(crate) mod solana { } } +pub(crate) mod last_voted_fork_slots_aggregate; pub mod wen_restart; diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 75e4e21ce..b14b7e4e8 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -1,59 +1,370 @@ //! The `wen-restart` module handles automatic repair during a cluster restart use { - crate::solana::wen_restart_proto::{ - MyLastVotedForkSlots, State as RestartState, WenRestartProgress, + crate::{ + last_voted_fork_slots_aggregate::LastVotedForkSlotsAggregate, + solana::wen_restart_proto::{ + self, LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsRecord, + State as RestartState, WenRestartProgress, + }, }, + anyhow::Result, log::*, prost::Message, - solana_gossip::{cluster_info::ClusterInfo, epoch_slots::MAX_SLOTS_PER_ENTRY}, + solana_gossip::{ + cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + restart_crds_values::RestartLastVotedForkSlots, + }, solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}, + solana_program::{clock::Slot, hash::Hash}, + solana_runtime::bank_forks::BankForks, + solana_sdk::timing::timestamp, solana_vote_program::vote_state::VoteTransaction, std::{ - fs::File, - io::{Error, Write}, + collections::{HashMap, HashSet}, + fs::{read, File}, + io::{Cursor, Write}, path::PathBuf, - sync::Arc, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::sleep, + time::Duration, }, }; +// If >42% of the validators have this block, repair this block locally. +const REPAIR_THRESHOLD: f64 = 0.42; + +#[derive(Debug, PartialEq)] +pub enum WenRestartError { + Exiting, + InvalidLastVoteType(VoteTransaction), + MalformedLastVotedForkSlotsProtobuf(Option), + MissingLastVotedForkSlots, + UnexpectedState(wen_restart_proto::State), +} + +impl std::fmt::Display for WenRestartError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WenRestartError::Exiting => write!(f, "Exiting"), + WenRestartError::InvalidLastVoteType(vote) => { + write!(f, "Invalid last vote type: {:?}", vote) + } + WenRestartError::MalformedLastVotedForkSlotsProtobuf(record) => { + write!(f, "Malformed last voted fork slots protobuf: {:?}", record) + } + WenRestartError::MissingLastVotedForkSlots => { + write!(f, "Missing last voted fork slots") + } + WenRestartError::UnexpectedState(state) => { + write!(f, "Unexpected state: {:?}", state) + } + } + } +} + +impl std::error::Error for WenRestartError {} + +// We need a WenRestartProgressInternalState so we can convert the protobuf written in file +// into internal data structure in the initialize function. It should be easily +// convertable to and from WenRestartProgress protobuf. +#[derive(Debug, PartialEq)] +pub(crate) enum WenRestartProgressInternalState { + Init { + last_voted_fork_slots: Vec, + last_vote_bankhash: Hash, + }, + LastVotedForkSlots { + last_voted_fork_slots: Vec, + }, + Done, +} + +pub(crate) fn send_restart_last_voted_fork_slots( + cluster_info: Arc, + last_voted_fork_slots: &[Slot], + last_vote_bankhash: Hash, +) -> Result { + cluster_info.push_restart_last_voted_fork_slots(last_voted_fork_slots, last_vote_bankhash)?; + Ok(LastVotedForkSlotsRecord { + last_voted_fork_slots: last_voted_fork_slots.to_vec(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: cluster_info.my_shred_version() as u32, + wallclock: timestamp(), + }) +} + +pub(crate) fn aggregate_restart_last_voted_fork_slots( + wen_restart_path: &PathBuf, + wait_for_supermajority_threshold_percent: u64, + cluster_info: Arc, + last_voted_fork_slots: &Vec, + bank_forks: Arc>, + wen_restart_repair_slots: Arc>>, + exit: Arc, + progress: &mut WenRestartProgress, +) -> Result<()> { + let root_bank; + { + root_bank = bank_forks.read().unwrap().root_bank().clone(); + } + let root_slot = root_bank.slot(); + let mut last_voted_fork_slots_aggregate = LastVotedForkSlotsAggregate::new( + root_slot, + REPAIR_THRESHOLD, + root_bank.epoch_stakes(root_bank.epoch()).unwrap(), + last_voted_fork_slots, + &cluster_info.id(), + ); + if let Some(aggregate_record) = &progress.last_voted_fork_slots_aggregate { + for (key_string, message) in &aggregate_record.received { + if let Err(e) = + last_voted_fork_slots_aggregate.aggregate_from_record(key_string, message) + { + error!("Failed to aggregate from record: {:?}", e); + } + } + } else { + progress.last_voted_fork_slots_aggregate = Some(LastVotedForkSlotsAggregateRecord { + received: HashMap::new(), + }); + } + let mut cursor = solana_gossip::crds::Cursor::default(); + let mut is_full_slots = HashSet::new(); + loop { + if exit.load(Ordering::Relaxed) { + return Err(WenRestartError::Exiting.into()); + } + let start = timestamp(); + for new_last_voted_fork_slots in cluster_info.get_restart_last_voted_fork_slots(&mut cursor) + { + let from = new_last_voted_fork_slots.from.to_string(); + if let Some(record) = + last_voted_fork_slots_aggregate.aggregate(new_last_voted_fork_slots) + { + progress + .last_voted_fork_slots_aggregate + .as_mut() + .unwrap() + .received + .insert(from, record); + } + } + let result = last_voted_fork_slots_aggregate.get_aggregate_result(); + let mut filtered_slots: Vec; + { + let my_bank_forks = bank_forks.read().unwrap(); + filtered_slots = result + .slots_to_repair + .into_iter() + .filter(|slot| { + if slot <= &root_slot || is_full_slots.contains(slot) { + return false; + } + let is_full = my_bank_forks + .get(*slot) + .map_or(false, |bank| bank.is_frozen()); + if is_full { + is_full_slots.insert(*slot); + } + !is_full + }) + .collect(); + } + filtered_slots.sort(); + info!( + "Active peers: {} Slots to repair: {:?}", + result.active_percent, &filtered_slots + ); + if filtered_slots.is_empty() + && result.active_percent > wait_for_supermajority_threshold_percent as f64 + { + *wen_restart_repair_slots.write().unwrap() = vec![]; + break; + } + { + *wen_restart_repair_slots.write().unwrap() = filtered_slots; + } + write_wen_restart_records(wen_restart_path, progress)?; + let elapsed = timestamp().saturating_sub(start); + let time_left = GOSSIP_SLEEP_MILLIS.saturating_sub(elapsed); + if time_left > 0 { + sleep(Duration::from_millis(time_left)); + } + } + Ok(()) +} + pub fn wait_for_wen_restart( wen_restart_path: &PathBuf, last_vote: VoteTransaction, blockstore: Arc, cluster_info: Arc, -) -> Result<(), Box> { - // repair and restart option does not work without last voted slot. - let last_vote_slot = last_vote - .last_voted_slot() - .expect("wen_restart doesn't work if local tower is wiped"); - let mut last_vote_fork: Vec = AncestorIterator::new_inclusive(last_vote_slot, &blockstore) - .take(MAX_SLOTS_PER_ENTRY) - .collect(); - info!( - "wen_restart last voted fork {} {:?}", - last_vote_slot, last_vote_fork - ); - last_vote_fork.reverse(); - // Todo(wen): add the following back in after Gossip code is checked in. - // cluster_info.push_last_voted_fork_slots(&last_voted_fork, last_vote.hash()); - // The rest of the protocol will be in another PR. - let current_progress = WenRestartProgress { - state: RestartState::Init.into(), - my_last_voted_fork_slots: Some(MyLastVotedForkSlots { - last_vote_slot, - last_vote_bankhash: last_vote.hash().to_string(), - shred_version: cluster_info.my_shred_version() as u32, - }), - }; - write_wen_restart_records(wen_restart_path, current_progress)?; - Ok(()) + bank_forks: Arc>, + wen_restart_repair_slots: Option>>>, + wait_for_supermajority_threshold_percent: u64, + exit: Arc, +) -> Result<()> { + let (mut state, mut progress) = + initialize(wen_restart_path, last_vote.clone(), blockstore.clone())?; + loop { + match &state { + WenRestartProgressInternalState::Init { + last_voted_fork_slots, + last_vote_bankhash, + } => { + progress.my_last_voted_fork_slots = Some(send_restart_last_voted_fork_slots( + cluster_info.clone(), + last_voted_fork_slots, + *last_vote_bankhash, + )?) + } + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots, + } => aggregate_restart_last_voted_fork_slots( + wen_restart_path, + wait_for_supermajority_threshold_percent, + cluster_info.clone(), + last_voted_fork_slots, + bank_forks.clone(), + wen_restart_repair_slots.clone().unwrap(), + exit.clone(), + &mut progress, + )?, + WenRestartProgressInternalState::Done => return Ok(()), + }; + state = increment_and_write_wen_restart_records(wen_restart_path, state, &mut progress)?; + } } -fn write_wen_restart_records( +pub(crate) fn increment_and_write_wen_restart_records( records_path: &PathBuf, - new_progress: WenRestartProgress, -) -> Result<(), Error> { + current_state: WenRestartProgressInternalState, + progress: &mut WenRestartProgress, +) -> Result { + let new_state = match current_state { + WenRestartProgressInternalState::Init { + last_voted_fork_slots, + last_vote_bankhash: _, + } => { + progress.set_state(RestartState::LastVotedForkSlots); + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots, + } + } + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots: _, + } => { + progress.set_state(RestartState::Done); + WenRestartProgressInternalState::Done + } + WenRestartProgressInternalState::Done => { + return Err(WenRestartError::UnexpectedState(RestartState::Done).into()) + } + }; + write_wen_restart_records(records_path, progress)?; + Ok(new_state) +} + +pub(crate) fn initialize( + records_path: &PathBuf, + last_vote: VoteTransaction, + blockstore: Arc, +) -> Result<(WenRestartProgressInternalState, WenRestartProgress)> { + let progress = match read_wen_restart_records(records_path) { + Ok(progress) => progress, + Err(e) => { + let stdio_err = e.downcast_ref::(); + if stdio_err.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound) { + info!( + "wen restart proto file not found at {:?}, write init state", + records_path + ); + let progress = WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; + write_wen_restart_records(records_path, &progress)?; + progress + } else { + return Err(e); + } + } + }; + match progress.state() { + RestartState::Done => Ok((WenRestartProgressInternalState::Done, progress)), + RestartState::Init => { + let last_voted_fork_slots; + let last_vote_bankhash; + match &progress.my_last_voted_fork_slots { + Some(my_last_voted_fork_slots) => { + last_voted_fork_slots = my_last_voted_fork_slots.last_voted_fork_slots.clone(); + last_vote_bankhash = + Hash::from_str(&my_last_voted_fork_slots.last_vote_bankhash).unwrap(); + } + None => { + // repair and restart option does not work without last voted slot. + if let VoteTransaction::Vote(ref vote) = last_vote { + if let Some(last_vote_slot) = vote.last_voted_slot() { + last_vote_bankhash = vote.hash; + last_voted_fork_slots = + AncestorIterator::new_inclusive(last_vote_slot, &blockstore) + .take(RestartLastVotedForkSlots::MAX_SLOTS) + .collect(); + } else { + error!(" + Cannot find last voted slot in the tower storage, it either means that this node has never \ + voted or the tower storage is corrupted. Unfortunately, since WenRestart is a consensus protocol \ + depending on each participant to send their last voted fork slots, your validator cannot participate.\ + Please check discord for the conclusion of the WenRestart protocol, then generate a snapshot and use \ + --wait-for-supermajority to restart the validator."); + return Err(WenRestartError::MissingLastVotedForkSlots.into()); + } + } else { + return Err(WenRestartError::InvalidLastVoteType(last_vote).into()); + } + } + } + Ok(( + WenRestartProgressInternalState::Init { + last_voted_fork_slots, + last_vote_bankhash, + }, + progress, + )) + } + RestartState::LastVotedForkSlots => { + if let Some(record) = progress.my_last_voted_fork_slots.as_ref() { + Ok(( + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots: record.last_voted_fork_slots.clone(), + }, + progress, + )) + } else { + Err(WenRestartError::MalformedLastVotedForkSlotsProtobuf(None).into()) + } + } + _ => Err(WenRestartError::UnexpectedState(progress.state()).into()), + } +} + +fn read_wen_restart_records(records_path: &PathBuf) -> Result { + let buffer = read(records_path)?; + let progress = WenRestartProgress::decode(&mut Cursor::new(buffer))?; + info!("read record {:?}", progress); + Ok(progress) +} + +pub(crate) fn write_wen_restart_records( + records_path: &PathBuf, + new_progress: &WenRestartProgress, +) -> Result<()> { // overwrite anything if exists let mut file = File::create(records_path)?; info!("writing new record {:?}", new_progress); @@ -62,61 +373,133 @@ fn write_wen_restart_records( file.write_all(&buf)?; Ok(()) } + #[cfg(test)] mod tests { use { crate::wen_restart::*, + assert_matches::assert_matches, solana_entry::entry, - solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_gossip::{ + cluster_info::ClusterInfo, + contact_info::ContactInfo, + crds::GossipRoute, + crds_value::{CrdsData, CrdsValue}, + legacy_contact_info::LegacyContactInfo, + restart_crds_values::RestartLastVotedForkSlots, + }, solana_ledger::{blockstore, get_tmp_ledger_path_auto_delete}, - solana_program::{hash::Hash, vote::state::Vote}, + solana_program::{ + hash::Hash, + vote::state::{Vote, VoteStateUpdate}, + }, + solana_runtime::{ + bank::Bank, + genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }, + }, solana_sdk::{ + pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, - std::{fs::read, sync::Arc}, + std::{fs::remove_file, sync::Arc, thread::Builder}, + tempfile::TempDir, }; - #[test] - fn test_wen_restart_normal_flow() { - solana_logger::setup(); - let node_keypair = Arc::new(Keypair::new()); + const SHRED_VERSION: u16 = 2; + const EXPECTED_SLOTS: usize = 400; + + fn push_restart_last_voted_fork_slots( + cluster_info: Arc, + node: &LegacyContactInfo, + expected_slots_to_repair: &[Slot], + last_vote_hash: &Hash, + node_keypair: &Keypair, + wallclock: u64, + ) { + let slots = RestartLastVotedForkSlots::new( + *node.pubkey(), + wallclock, + expected_slots_to_repair, + *last_vote_hash, + SHRED_VERSION, + ) + .unwrap(); + let entries = vec![ + CrdsValue::new_signed(CrdsData::LegacyContactInfo(node.clone()), node_keypair), + CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots), node_keypair), + ]; + { + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + for entry in entries { + assert!(gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); + } + } + } + + struct WenRestartTestInitResult { + pub validator_voting_keypairs: Vec, + pub blockstore: Arc, + pub cluster_info: Arc, + pub bank_forks: Arc>, + pub last_voted_fork_slots: Vec, + pub wen_restart_proto_path: PathBuf, + } + + fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult { + let validator_voting_keypairs: Vec<_> = + (0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect(); + let node_keypair = Arc::new(validator_voting_keypairs[0].node_keypair.insecure_clone()); let cluster_info = Arc::new(ClusterInfo::new( { let mut contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()); - contact_info.set_shred_version(2); + contact_info.set_shred_version(SHRED_VERSION); contact_info }, - node_keypair, + node_keypair.clone(), SocketAddrSpace::Unspecified, )); - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let mut wen_restart_proto_path = ledger_path.path().to_path_buf(); - wen_restart_proto_path.push("wen_restart_status.proto"); let blockstore = Arc::new(blockstore::Blockstore::open(ledger_path.path()).unwrap()); - let expected_slots = 400; - let last_vote_slot = (MAX_SLOTS_PER_ENTRY + expected_slots).try_into().unwrap(); - let last_parent = (MAX_SLOTS_PER_ENTRY >> 1).try_into().unwrap(); - for i in 0..expected_slots { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + vec![100; validator_voting_keypairs.len()], + ); + let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); + let last_parent = (RestartLastVotedForkSlots::MAX_SLOTS >> 1) + .try_into() + .unwrap(); + let mut last_voted_fork_slots = Vec::new(); + for i in 0..EXPECTED_SLOTS { let entries = entry::create_ticks(1, 0, Hash::default()); let parent_slot = if i > 0 { - (MAX_SLOTS_PER_ENTRY + i).try_into().unwrap() + (RestartLastVotedForkSlots::MAX_SLOTS.saturating_add(i)) + .try_into() + .unwrap() } else { last_parent }; + let slot = (RestartLastVotedForkSlots::MAX_SLOTS + .saturating_add(i) + .saturating_add(1)) as Slot; let shreds = blockstore::entries_to_test_shreds( &entries, - (MAX_SLOTS_PER_ENTRY + i + 1).try_into().unwrap(), + slot, parent_slot, false, 0, true, // merkle_variant ); blockstore.insert_shreds(shreds, None, false).unwrap(); + last_voted_fork_slots.push(slot); } - // link directly to slot 1 whose distance to last_vote > MAX_SLOTS_PER_ENTRY so it will not be included. + // link directly to slot 1 whose distance to last_vote > RestartLastVotedForkSlots::MAX_SLOTS so it will not be included. let entries = entry::create_ticks(1, 0, Hash::default()); let shreds = blockstore::entries_to_test_shreds( &entries, @@ -126,27 +509,567 @@ mod tests { 0, true, // merkle_variant ); + last_voted_fork_slots.extend([last_parent, 1]); blockstore.insert_shreds(shreds, None, false).unwrap(); - let last_vote_bankhash = Hash::new_unique(); - assert!(wait_for_wen_restart( - &wen_restart_proto_path, - VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), + last_voted_fork_slots.sort(); + last_voted_fork_slots.reverse(); + let mut wen_restart_proto_path = ledger_path.path().to_path_buf(); + wen_restart_proto_path.push("wen_restart_status.proto"); + let _ = remove_file(&wen_restart_proto_path); + WenRestartTestInitResult { + validator_voting_keypairs, blockstore, - cluster_info - ) - .is_ok()); - let buffer = read(wen_restart_proto_path).unwrap(); - let progress = WenRestartProgress::decode(&mut std::io::Cursor::new(buffer)).unwrap(); + cluster_info, + bank_forks, + last_voted_fork_slots, + wen_restart_proto_path, + } + } + + const WAIT_FOR_THREAD_TIMEOUT: u64 = 10_000; + + fn wait_on_expected_progress_with_timeout( + wen_restart_proto_path: PathBuf, + expected_progress: WenRestartProgress, + ) { + let start = timestamp(); + let mut progress = WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; + loop { + if let Ok(new_progress) = read_wen_restart_records(&wen_restart_proto_path) { + progress = new_progress; + if let Some(my_last_voted_fork_slots) = &expected_progress.my_last_voted_fork_slots + { + if let Some(record) = progress.my_last_voted_fork_slots.as_mut() { + record.wallclock = my_last_voted_fork_slots.wallclock; + } + } + if progress == expected_progress { + return; + } + } + if timestamp().saturating_sub(start) > WAIT_FOR_THREAD_TIMEOUT { + panic!( + "wait_on_expected_progress_with_timeout failed to get expected progress {:?} expected {:?}", + &progress, + expected_progress + ); + } + sleep(Duration::from_millis(10)); + } + } + + fn wen_restart_test_succeed_after_failure( + test_state: WenRestartTestInitResult, + last_vote_bankhash: Hash, + expected_progress: WenRestartProgress, + ) { + let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); + // continue normally after the error, we should be good. + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let last_vote_slot: Slot = test_state.last_voted_fork_slots[0]; + let wen_restart_thread_handle = Builder::new() + .name("solana-wen-restart".to_string()) + .spawn(move || { + let _ = wait_for_wen_restart( + &wen_restart_proto_path_clone, + VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), + test_state.blockstore, + test_state.cluster_info, + test_state.bank_forks, + Some(Arc::new(RwLock::new(Vec::new()))), + 80, + exit_clone, + ); + }) + .unwrap(); + wait_on_expected_progress_with_timeout( + test_state.wen_restart_proto_path.clone(), + expected_progress, + ); + exit.store(true, Ordering::Relaxed); + let _ = wen_restart_thread_handle.join(); + let _ = remove_file(&test_state.wen_restart_proto_path); + } + + fn insert_and_freeze_slots( + bank_forks: Arc>, + expected_slots_to_repair: Vec, + ) { + let mut parent_bank = bank_forks.read().unwrap().root_bank(); + for slot in expected_slots_to_repair { + let mut bank_forks_rw = bank_forks.write().unwrap(); + bank_forks_rw.insert(Bank::new_from_parent( + parent_bank.clone(), + &Pubkey::default(), + slot, + )); + parent_bank = bank_forks_rw.get(slot).unwrap(); + parent_bank.freeze(); + } + } + + #[test] + fn test_wen_restart_normal_flow() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let wen_restart_repair_slots = Some(Arc::new(RwLock::new(Vec::new()))); + let test_state = wen_restart_test_init(&ledger_path); + let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); + let cluster_info_clone = test_state.cluster_info.clone(); + let last_vote_slot = test_state.last_voted_fork_slots[0]; + let last_vote_bankhash = Hash::new_unique(); + let expected_slots_to_repair: Vec = + (last_vote_slot + 1..last_vote_slot + 3).collect(); + let blockstore_clone = test_state.blockstore.clone(); + let bank_forks_clone = test_state.bank_forks.clone(); + let wen_restart_thread_handle = Builder::new() + .name("solana-wen-restart".to_string()) + .spawn(move || { + assert!(wait_for_wen_restart( + &wen_restart_proto_path_clone, + VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)), + blockstore_clone, + cluster_info_clone, + bank_forks_clone, + wen_restart_repair_slots.clone(), + 80, + Arc::new(AtomicBool::new(false)), + ) + .is_ok()); + }) + .unwrap(); + let mut rng = rand::thread_rng(); + let mut expected_messages = HashMap::new(); + // Skip the first 2 validators, because 0 is myself, we only need 8 more to reach > 80%. + for keypairs in test_state.validator_voting_keypairs.iter().skip(2) { + let node_pubkey = keypairs.node_keypair.pubkey(); + let node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); + let last_vote_hash = Hash::new_unique(); + let now = timestamp(); + push_restart_last_voted_fork_slots( + test_state.cluster_info.clone(), + &node, + &expected_slots_to_repair, + &last_vote_hash, + &keypairs.node_keypair, + now, + ); + expected_messages.insert( + node_pubkey.to_string(), + LastVotedForkSlotsRecord { + last_voted_fork_slots: expected_slots_to_repair.clone(), + last_vote_bankhash: last_vote_hash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: now, + }, + ); + } + + // Simulating successful repair of missing blocks. + insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + + let _ = wen_restart_thread_handle.join(); + let progress = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap(); + let progress_start_time = progress + .my_last_voted_fork_slots + .as_ref() + .unwrap() + .wallclock; assert_eq!( progress, WenRestartProgress { - state: RestartState::Init.into(), - my_last_voted_fork_slots: Some(MyLastVotedForkSlots { - last_vote_slot, + state: RestartState::Done.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots, last_vote_bankhash: last_vote_bankhash.to_string(), - shred_version: 2, + shred_version: SHRED_VERSION as u32, + wallclock: progress_start_time, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: expected_messages }), } ) } + + fn change_proto_file_readonly(wen_restart_proto_path: &PathBuf, readonly: bool) { + let mut perms = std::fs::metadata(wen_restart_proto_path) + .unwrap() + .permissions(); + perms.set_readonly(readonly); + std::fs::set_permissions(wen_restart_proto_path, perms).unwrap(); + } + + #[test] + fn test_wen_restart_initialize_failures() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let last_vote_bankhash = Hash::new_unique(); + let mut last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); + last_voted_fork_slots.reverse(); + let mut file = File::create(&test_state.wen_restart_proto_path).unwrap(); + file.write_all(b"garbage").unwrap(); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), + test_state.blockstore.clone() + ) + .unwrap_err() + .downcast::() + .unwrap(), + prost::DecodeError::new("invalid wire type value: 7") + ); + remove_file(&test_state.wen_restart_proto_path).unwrap(); + let invalid_last_vote = VoteTransaction::from(VoteStateUpdate::from(vec![(0, 8), (1, 1)])); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + invalid_last_vote.clone(), + test_state.blockstore.clone() + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::InvalidLastVoteType(invalid_last_vote) + ); + let empty_last_vote = VoteTransaction::from(Vote::new(vec![], last_vote_bankhash)); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + empty_last_vote.clone(), + test_state.blockstore.clone() + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::MissingLastVotedForkSlots, + ); + // Test the case where the file is not found. + let _ = remove_file(&test_state.wen_restart_proto_path); + assert_matches!( + initialize(&test_state.wen_restart_proto_path, VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), test_state.blockstore.clone()), + Ok((WenRestartProgressInternalState::Init { last_voted_fork_slots, last_vote_bankhash: bankhash }, progress)) => { + assert_eq!(last_voted_fork_slots, test_state.last_voted_fork_slots); + assert_eq!(bankhash, last_vote_bankhash); + assert_eq!(progress, WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }); + } + ); + let _ = write_wen_restart_records( + &test_state.wen_restart_proto_path, + &WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }, + ); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)), + test_state.blockstore.clone() + ) + .err() + .unwrap() + .to_string(), + "Malformed last voted fork slots protobuf: None" + ); + let _ = write_wen_restart_records( + &test_state.wen_restart_proto_path, + &WenRestartProgress { + state: RestartState::WaitingForSupermajority.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }, + ); + assert_eq!( + initialize( + &test_state.wen_restart_proto_path, + VoteTransaction::from(Vote::new(last_voted_fork_slots, last_vote_bankhash)), + test_state.blockstore.clone() + ) + .err() + .unwrap() + .to_string(), + "Unexpected state: WaitingForSupermajority" + ); + } + + #[test] + fn test_wen_restart_send_last_voted_fork_failures() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let progress = wen_restart_proto::WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; + let original_progress = progress.clone(); + assert_eq!( + send_restart_last_voted_fork_slots( + test_state.cluster_info.clone(), + &[], + Hash::new_unique(), + ) + .err() + .unwrap() + .to_string(), + "Last voted fork cannot be empty" + ); + assert_eq!(progress, original_progress); + let last_vote_bankhash = Hash::new_unique(); + let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); + wen_restart_test_succeed_after_failure( + test_state, + last_vote_bankhash, + WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots, + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: HashMap::new(), + }), + }, + ); + } + + #[test] + fn test_write_wen_restart_records_failure() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let progress = wen_restart_proto::WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: None, + last_voted_fork_slots_aggregate: None, + }; + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok()); + change_proto_file_readonly(&test_state.wen_restart_proto_path, true); + assert_eq!( + write_wen_restart_records(&test_state.wen_restart_proto_path, &progress) + .unwrap_err() + .downcast::() + .unwrap() + .kind(), + std::io::ErrorKind::PermissionDenied, + ); + change_proto_file_readonly(&test_state.wen_restart_proto_path, false); + assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok()); + let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); + let last_vote_bankhash = Hash::new_unique(); + wen_restart_test_succeed_after_failure( + test_state, + last_vote_bankhash, + WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots, + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: 0, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: HashMap::new(), + }), + }, + ); + } + + #[test] + fn test_wen_restart_aggregate_last_voted_fork_failures() { + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let test_state = wen_restart_test_init(&ledger_path); + let last_vote_slot: Slot = test_state.last_voted_fork_slots[0]; + let last_vote_bankhash = Hash::new_unique(); + let start_time = timestamp(); + assert!(write_wen_restart_records( + &test_state.wen_restart_proto_path, + &WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: start_time, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: HashMap::new() + }), + } + ) + .is_ok()); + let mut rng = rand::thread_rng(); + let mut expected_messages = HashMap::new(); + let expected_slots_to_repair: Vec = + (last_vote_slot + 1..last_vote_slot + 3).collect(); + // Skip the first 2 validators, because 0 is myself, we need 8 so it hits 80%. + assert_eq!(test_state.validator_voting_keypairs.len(), 10); + let progress = WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: start_time, + }), + last_voted_fork_slots_aggregate: None, + }; + for keypairs in test_state.validator_voting_keypairs.iter().skip(2) { + let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); + let cluster_info_clone = test_state.cluster_info.clone(); + let bank_forks_clone = test_state.bank_forks.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let mut progress_clone = progress.clone(); + let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); + let wen_restart_thread_handle = Builder::new() + .name("solana-wen-restart".to_string()) + .spawn(move || { + let _ = aggregate_restart_last_voted_fork_slots( + &wen_restart_proto_path_clone, + 80, + cluster_info_clone, + &last_voted_fork_slots, + bank_forks_clone, + Arc::new(RwLock::new(Vec::new())), + exit_clone, + &mut progress_clone, + ); + }) + .unwrap(); + let node_pubkey = keypairs.node_keypair.pubkey(); + let node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); + let last_vote_hash = Hash::new_unique(); + let now = timestamp(); + push_restart_last_voted_fork_slots( + test_state.cluster_info.clone(), + &node, + &expected_slots_to_repair, + &last_vote_hash, + &keypairs.node_keypair, + now, + ); + expected_messages.insert( + node_pubkey.to_string(), + LastVotedForkSlotsRecord { + last_voted_fork_slots: expected_slots_to_repair.clone(), + last_vote_bankhash: last_vote_hash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: now, + }, + ); + // Wait for the newly pushed message to be in written proto file. + wait_on_expected_progress_with_timeout( + test_state.wen_restart_proto_path.clone(), + WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: test_state.last_voted_fork_slots.clone(), + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: start_time, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: expected_messages.clone(), + }), + }, + ); + exit.store(true, Ordering::Relaxed); + let _ = wen_restart_thread_handle.join(); + } + + // Simulating successful repair of missing blocks. + insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + + let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); + wen_restart_test_succeed_after_failure( + test_state, + last_vote_bankhash, + WenRestartProgress { + state: RestartState::Done.into(), + my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord { + last_voted_fork_slots, + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: SHRED_VERSION as u32, + wallclock: start_time, + }), + last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord { + received: expected_messages, + }), + }, + ); + } + + #[test] + fn test_increment_and_write_wen_restart_records() { + solana_logger::setup(); + let my_dir = TempDir::new().unwrap(); + let mut wen_restart_proto_path = my_dir.path().to_path_buf(); + wen_restart_proto_path.push("wen_restart_status.proto"); + let last_vote_bankhash = Hash::new_unique(); + let mut state = WenRestartProgressInternalState::Init { + last_voted_fork_slots: vec![0, 1], + last_vote_bankhash, + }; + let my_last_voted_fork_slots = Some(LastVotedForkSlotsRecord { + last_voted_fork_slots: vec![0, 1], + last_vote_bankhash: last_vote_bankhash.to_string(), + shred_version: 0, + wallclock: 0, + }); + let mut progress = WenRestartProgress { + state: RestartState::Init.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: None, + }; + for (expected_state, expected_progress) in [ + ( + WenRestartProgressInternalState::LastVotedForkSlots { + last_voted_fork_slots: vec![0, 1], + }, + WenRestartProgress { + state: RestartState::LastVotedForkSlots.into(), + my_last_voted_fork_slots: my_last_voted_fork_slots.clone(), + last_voted_fork_slots_aggregate: None, + }, + ), + ( + WenRestartProgressInternalState::Done, + WenRestartProgress { + state: RestartState::Done.into(), + my_last_voted_fork_slots, + last_voted_fork_slots_aggregate: None, + }, + ), + ] { + state = increment_and_write_wen_restart_records( + &wen_restart_proto_path, + state, + &mut progress, + ) + .unwrap(); + assert_eq!(&state, &expected_state); + assert_eq!(&progress, &expected_progress); + } + assert_eq!( + increment_and_write_wen_restart_records(&wen_restart_proto_path, state, &mut progress) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::UnexpectedState(RestartState::Done), + ); + } }