Wen restart aggregate last voted fork slots (#33892)

* Push and aggregate RestartLastVotedForkSlots.

* Fix API and lint errors.

* Reduce clutter.

* Put my own LastVotedForkSlots into the aggregate.

* Write LastVotedForkSlots aggregate progress into local file.

* Fix typo and name constants.

* Fix flaky test.

* Clarify the comments.

* - Use constant for wait_for_supermajority
- Avoid waiting after first shred when repair is in wen_restart

* Fix delay_after_first_shred and remove loop in wen_restart.

* Read wen_restart slots inside the loop instead.

* Discard turbine shreds while in wen_restart in windows insert rather than
shred_fetch_stage.

* Use the new Gossip API.

* Rename slots_to_repair_for_wen_restart and a few others.

* Rename a few more and list all states.

* Pipe exit down to aggregate loop so we can exit early.

* Fix import of RestartLastVotedForkSlots.

* Use the new method to generate test bank.

* Make linter happy.

* Use new bank constructor for tests.

* Fix a bad merge.

* - add new const for wen_restart
- fix the test to cover more cases
- add generate_repairs_for_slot_not_throtted_by_tick and
  generate_repairs_for_slot_throtted_by_tick to make it readable

* Add initialize and put the main logic into a loop.

* Change aggregate interface and other fixes.

* Add failure tests and tests for state transition.

* Add more tests and add ability to recover from written records in
last_voted_fork_slots_aggregate.

* Various name changes.

* We don't really care what type of error is returned.

* Wait on expected progress message in proto file instead of sleep.

* Code reorganization and cleanup.

* Make linter happy.

* Add WenRestartError.

* Split WenRestartErrors into separate erros per state.

* Revert "Split WenRestartErrors into separate erros per state."

This reverts commit 4c920cb8f8d492707560441912351cca779129f6.

* Use individual functions when testing for failures.

* Move initialization errors into initialize().

* Use anyhow instead of thiserror to generate backtrace for error.

* Add missing Cargo.lock.

* Add error log when last_vote is missing in the tower storage.

* Change error log info.

* Change test to match exact error.
This commit is contained in:
Wen 2024-03-01 18:52:47 -08:00 committed by GitHub
parent 608329b974
commit bfe44d95f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1705 additions and 109 deletions

5
Cargo.lock generated
View File

@ -7608,13 +7608,17 @@ dependencies = [
name = "solana-wen-restart"
version = "1.19.0"
dependencies = [
"anyhow",
"assert_matches",
"log",
"prost",
"prost-build",
"prost-types",
"protobuf-src",
"rand 0.8.5",
"rustc_version 0.4.0",
"serial_test",
"solana-accounts-db",
"solana-entry",
"solana-gossip",
"solana-ledger",
@ -7624,6 +7628,7 @@ dependencies = [
"solana-sdk",
"solana-streamer",
"solana-vote-program",
"tempfile",
]
[[package]]

View File

@ -1376,6 +1376,7 @@ mod test {
ancestor_duplicate_slots_sender,
repair_validators: None,
repair_whitelist,
wen_restart_repair_slots: None,
};
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =

View File

@ -186,7 +186,7 @@ pub fn get_closest_completion(
continue;
}
let slot_meta = slot_meta_cache.get(&path_slot).unwrap().as_ref().unwrap();
let new_repairs = RepairService::generate_repairs_for_slot(
let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick(
blockstore,
path_slot,
slot_meta,

View File

@ -224,6 +224,8 @@ pub struct RepairInfo {
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
// A given list of slots to repair when in wen_restart
pub wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
}
pub struct RepairSlotRange {
@ -397,17 +399,24 @@ impl RepairService {
);
add_votes_elapsed.stop();
let repairs = repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
);
let repairs = match repair_info.wen_restart_repair_slots.clone() {
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart(
blockstore,
MAX_REPAIR_LENGTH,
&slots_to_repair.read().unwrap(),
),
None => repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
),
};
let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks(
root_bank.epoch_stakes_map(),
@ -618,32 +627,58 @@ impl RepairService {
}
}
/// If this slot is missing shreds generate repairs
pub fn generate_repairs_for_slot(
pub fn generate_repairs_for_slot_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true)
}
pub fn generate_repairs_for_slot_not_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false)
}
/// If this slot is missing shreds generate repairs
fn generate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
throttle_requests_by_shred_tick: bool,
) -> Vec<ShredRepairType> {
let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick {
DEFER_REPAIR_THRESHOLD_TICKS
} else {
0
};
if max_repairs == 0 || slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
// System time is not monotonic
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* timestamp().saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(DEFER_REPAIR_THRESHOLD_TICKS)
if throttle_requests_by_shred_tick {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
return vec![];
// System time is not monotonic
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* timestamp().saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(defer_repair_threshold_ticks)
{
return vec![];
}
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]
@ -652,7 +687,7 @@ impl RepairService {
.find_missing_data_indexes(
slot,
slot_meta.first_shred_timestamp,
DEFER_REPAIR_THRESHOLD_TICKS,
defer_repair_threshold_ticks,
slot_meta.consumed,
slot_meta.received,
max_repairs,
@ -674,7 +709,7 @@ impl RepairService {
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&slot_meta,
@ -689,6 +724,33 @@ impl RepairService {
}
}
pub(crate) fn generate_repairs_for_wen_restart(
blockstore: &Blockstore,
max_repairs: usize,
slots: &Vec<Slot>,
) -> Vec<ShredRepairType> {
let mut repairs: Vec<ShredRepairType> = Vec::new();
for slot in slots {
if let Some(slot_meta) = blockstore.meta(*slot).unwrap() {
// When in wen_restart, turbine is not running, so there is
// no need to wait after first shred.
let new_repairs = Self::generate_repairs_for_slot_not_throttled_by_tick(
blockstore,
*slot,
&slot_meta,
max_repairs - repairs.len(),
);
repairs.extend(new_repairs);
} else {
repairs.push(ShredRepairType::HighestShred(*slot, 0));
}
if repairs.len() >= max_repairs {
break;
}
}
repairs
}
fn get_repair_peers(
cluster_info: Arc<ClusterInfo>,
cluster_slots: Arc<ClusterSlots>,
@ -845,7 +907,7 @@ impl RepairService {
..SlotMeta::default()
});
let new_repairs = Self::generate_repairs_for_slot(
let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&meta,
@ -867,7 +929,7 @@ impl RepairService {
// If the slot is full, no further need to repair this slot
None
} else {
Some(Self::generate_repairs_for_slot(
Some(Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&slot_meta,
@ -1548,4 +1610,63 @@ mod test {
);
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
}
#[test]
fn test_generate_repairs_for_wen_restart() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let max_repairs = 3;
let slots: Vec<u64> = vec![2, 3, 5, 7];
let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() {
slot_shreds.remove(i);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
}
let mut slots_to_repair: Vec<Slot> = vec![];
// When slots_to_repair is empty, ignore all and return empty result.
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert!(result.is_empty());
// When asked to repair slot with missing shreds and some unknown slot, return correct results.
slots_to_repair = vec![3, 81];
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert_eq!(
result,
vec![
ShredRepairType::Shred(3, 1),
ShredRepairType::HighestShred(81, 0),
],
);
// Test that it will not generate more than max_repairs.e().unwrap();
slots_to_repair = vec![2, 82, 7, 83, 84];
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert_eq!(result.len(), max_repairs);
assert_eq!(
result,
vec![
ShredRepairType::Shred(2, 0),
ShredRepairType::HighestShred(82, 0),
ShredRepairType::HighestShred(7, 3),
],
);
}
}

View File

@ -98,7 +98,7 @@ pub fn get_best_repair_shreds(
if let Some(slot_meta) = slot_meta {
match next {
Visit::Unvisited(slot) => {
let new_repairs = RepairService::generate_repairs_for_slot(
let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
slot_meta,

View File

@ -143,6 +143,7 @@ impl Tvu {
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
@ -214,6 +215,7 @@ impl Tvu {
repair_whitelist: tvu_config.repair_whitelist,
cluster_info: cluster_info.clone(),
cluster_slots: cluster_slots.clone(),
wen_restart_repair_slots,
};
WindowService::new(
blockstore.clone(),
@ -499,6 +501,7 @@ pub mod tests {
repair_quic_endpoint_sender,
outstanding_repair_requests,
cluster_slots,
None,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -138,6 +138,11 @@ use {
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
// Right now since we reuse the wait for supermajority code, the
// following threshold should always greater than or equal to
// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;
#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
#[strum(serialize_all = "kebab-case")]
@ -1236,6 +1241,11 @@ impl Validator {
};
let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
let wen_restart_repair_slots = if in_wen_restart {
Some(Arc::new(RwLock::new(Vec::new())))
} else {
None
};
let tower = match process_blockstore.process_to_create_tower() {
Ok(tower) => {
info!("Tower state: {:?}", tower);
@ -1310,6 +1320,7 @@ impl Validator {
repair_quic_endpoint_sender,
outstanding_repair_requests.clone(),
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
)?;
if in_wen_restart {
@ -1319,6 +1330,10 @@ impl Validator {
last_vote,
blockstore.clone(),
cluster_info.clone(),
bank_forks.clone(),
wen_restart_repair_slots.clone(),
WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
exit.clone(),
) {
Ok(()) => {
return Err("wen_restart phase one completedy".to_string());

View File

@ -248,10 +248,11 @@ fn verify_repair(
.unwrap_or(true)
}
fn prune_shreds_invalid_repair(
fn prune_shreds_by_repair_status(
shreds: &mut Vec<Shred>,
repair_infos: &mut Vec<Option<RepairMeta>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
accept_repairs_only: bool,
) {
assert_eq!(shreds.len(), repair_infos.len());
let mut i = 0;
@ -260,7 +261,8 @@ fn prune_shreds_invalid_repair(
let mut outstanding_requests = outstanding_requests.write().unwrap();
shreds.retain(|shred| {
let should_keep = (
verify_repair(&mut outstanding_requests, shred, &repair_infos[i]),
(!accept_repairs_only || repair_infos[i].is_some())
&& verify_repair(&mut outstanding_requests, shred, &repair_infos[i]),
i += 1,
)
.0;
@ -288,6 +290,7 @@ fn run_insert<F>(
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
accept_repairs_only: bool,
) -> Result<()>
where
F: Fn(PossibleDuplicateShred),
@ -333,7 +336,12 @@ where
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
prune_shreds_by_repair_status(
&mut shreds,
&mut repair_infos,
outstanding_requests,
accept_repairs_only,
);
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len();
let repairs: Vec<_> = repair_infos
.iter()
@ -391,6 +399,10 @@ impl WindowService {
let cluster_info = repair_info.cluster_info.clone();
let bank_forks = repair_info.bank_forks.clone();
// In wen_restart, we discard all shreds from Turbine and keep only those from repair to
// avoid new shreds make validator OOM before wen_restart is over.
let accept_repairs_only = repair_info.wen_restart_repair_slots.is_some();
let repair_service = RepairService::new(
blockstore.clone(),
exit.clone(),
@ -426,6 +438,7 @@ impl WindowService {
completed_data_sets_sender,
retransmit_sender,
outstanding_repair_requests,
accept_repairs_only,
);
WindowService {
@ -475,6 +488,7 @@ impl WindowService {
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
accept_repairs_only: bool,
) -> JoinHandle<()> {
let handle_error = || {
inc_new_counter_error!("solana-window-insert-error", 1, 1);
@ -507,6 +521,7 @@ impl WindowService {
&retransmit_sender,
&outstanding_requests,
&reed_solomon_cache,
accept_repairs_only,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &handle_error) {
@ -743,7 +758,7 @@ mod test {
4, // position
0, // version
);
let mut shreds = vec![shred.clone(), shred.clone(), shred];
let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()];
let repair_meta = RepairMeta { nonce: 0 };
let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));
let repair_type = ShredRepairType::Orphan(9);
@ -753,9 +768,21 @@ mod test {
.add_request(repair_type, timestamp());
let repair_meta1 = RepairMeta { nonce };
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)];
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests);
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, false);
assert_eq!(shreds.len(), 2);
assert_eq!(repair_infos.len(), 2);
assert!(repair_infos[0].is_none());
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce);
shreds = vec![shred.clone(), shred.clone(), shred];
let repair_meta2 = RepairMeta { nonce: 0 };
let repair_meta3 = RepairMeta { nonce };
repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)];
// In wen_restart, we discard all Turbine shreds and only keep valid repair shreds.
prune_shreds_by_repair_status(&mut shreds, &mut repair_infos, &outstanding_requests, true);
assert_eq!(shreds.len(), 1);
assert_eq!(repair_infos.len(), 1);
assert!(repair_infos[0].is_some());
assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce);
}
}

View File

@ -6581,6 +6581,7 @@ dependencies = [
name = "solana-wen-restart"
version = "1.19.0"
dependencies = [
"anyhow",
"log",
"prost",
"prost-build",

View File

@ -11,6 +11,7 @@ edition = { workspace = true }
publish = true
[dependencies]
anyhow = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
@ -23,9 +24,14 @@ solana-sdk = { workspace = true }
solana-vote-program = { workspace = true }
[dev-dependencies]
assert_matches = { workspace = true }
rand = { workspace = true }
serial_test = { workspace = true }
solana-accounts-db = { workspace = true }
solana-entry = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-streamer = { workspace = true }
tempfile = { workspace = true }
[build-dependencies]
prost-build = { workspace = true }

View File

@ -11,13 +11,19 @@ enum State {
DONE = 6;
}
message MyLastVotedForkSlots {
uint64 last_vote_slot = 1;
message LastVotedForkSlotsRecord {
repeated uint64 last_voted_fork_slots = 1;
string last_vote_bankhash = 2;
uint32 shred_version = 3;
uint64 wallclock = 4;
}
message LastVotedForkSlotsAggregateRecord {
map<string, LastVotedForkSlotsRecord> received = 1;
}
message WenRestartProgress {
State state = 1;
optional MyLastVotedForkSlots my_last_voted_fork_slots = 2;
optional LastVotedForkSlotsRecord my_last_voted_fork_slots = 2;
optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3;
}

View File

@ -0,0 +1,487 @@
use {
crate::solana::wen_restart_proto::LastVotedForkSlotsRecord,
anyhow::Result,
log::*,
solana_gossip::restart_crds_values::RestartLastVotedForkSlots,
solana_runtime::epoch_stakes::EpochStakes,
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey},
std::{
collections::{HashMap, HashSet},
str::FromStr,
},
};
pub struct LastVotedForkSlotsAggregate {
root_slot: Slot,
repair_threshold: f64,
// TODO(wen): using local root's EpochStakes, need to fix if crossing Epoch boundary.
epoch_stakes: EpochStakes,
last_voted_fork_slots: HashMap<Pubkey, RestartLastVotedForkSlots>,
slots_stake_map: HashMap<Slot, u64>,
active_peers: HashSet<Pubkey>,
slots_to_repair: HashSet<Slot>,
}
pub struct LastVotedForkSlotsAggregateResult {
pub slots_to_repair: Vec<Slot>,
pub active_percent: f64, /* 0 ~ 100.0 */
}
impl LastVotedForkSlotsAggregate {
pub(crate) fn new(
root_slot: Slot,
repair_threshold: f64,
epoch_stakes: &EpochStakes,
last_voted_fork_slots: &Vec<Slot>,
my_pubkey: &Pubkey,
) -> Self {
let mut active_peers = HashSet::new();
let sender_stake = Self::validator_stake(epoch_stakes, my_pubkey);
active_peers.insert(*my_pubkey);
let mut slots_stake_map = HashMap::new();
for slot in last_voted_fork_slots {
if slot > &root_slot {
slots_stake_map.insert(*slot, sender_stake);
}
}
Self {
root_slot,
repair_threshold,
epoch_stakes: epoch_stakes.clone(),
last_voted_fork_slots: HashMap::new(),
slots_stake_map,
active_peers,
slots_to_repair: HashSet::new(),
}
}
fn validator_stake(epoch_stakes: &EpochStakes, pubkey: &Pubkey) -> u64 {
epoch_stakes
.node_id_to_vote_accounts()
.get(pubkey)
.map(|x| x.total_stake)
.unwrap_or_default()
}
pub(crate) fn aggregate_from_record(
&mut self,
key_string: &str,
record: &LastVotedForkSlotsRecord,
) -> Result<Option<LastVotedForkSlotsRecord>> {
let from = Pubkey::from_str(key_string)?;
let last_voted_hash = Hash::from_str(&record.last_vote_bankhash)?;
let converted_record = RestartLastVotedForkSlots::new(
from,
record.wallclock,
&record.last_voted_fork_slots,
last_voted_hash,
record.shred_version as u16,
)?;
Ok(self.aggregate(converted_record))
}
pub(crate) fn aggregate(
&mut self,
new_slots: RestartLastVotedForkSlots,
) -> Option<LastVotedForkSlotsRecord> {
let total_stake = self.epoch_stakes.total_stake();
let threshold_stake = (total_stake as f64 * self.repair_threshold) as u64;
let from = &new_slots.from;
let sender_stake = Self::validator_stake(&self.epoch_stakes, from);
if sender_stake == 0 {
warn!(
"Gossip should not accept zero-stake RestartLastVotedFork from {:?}",
from
);
return None;
}
self.active_peers.insert(*from);
let new_slots_vec = new_slots.to_slots(self.root_slot);
let record = LastVotedForkSlotsRecord {
last_voted_fork_slots: new_slots_vec.clone(),
last_vote_bankhash: new_slots.last_voted_hash.to_string(),
shred_version: new_slots.shred_version as u32,
wallclock: new_slots.wallclock,
};
let new_slots_set: HashSet<Slot> = HashSet::from_iter(new_slots_vec);
let old_slots_set = match self.last_voted_fork_slots.insert(*from, new_slots.clone()) {
Some(old_slots) => {
if old_slots == new_slots {
return None;
} else {
HashSet::from_iter(old_slots.to_slots(self.root_slot))
}
}
None => HashSet::new(),
};
for slot in old_slots_set.difference(&new_slots_set) {
let entry = self.slots_stake_map.get_mut(slot).unwrap();
*entry = entry.saturating_sub(sender_stake);
if *entry < threshold_stake {
self.slots_to_repair.remove(slot);
}
}
for slot in new_slots_set.difference(&old_slots_set) {
let entry = self.slots_stake_map.entry(*slot).or_insert(0);
*entry = entry.saturating_add(sender_stake);
if *entry >= threshold_stake {
self.slots_to_repair.insert(*slot);
}
}
Some(record)
}
pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult {
let total_stake = self.epoch_stakes.total_stake();
let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| {
sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey))
});
let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0;
LastVotedForkSlotsAggregateResult {
slots_to_repair: self.slots_to_repair.iter().cloned().collect(),
active_percent,
}
}
}
#[cfg(test)]
mod tests {
use {
crate::{
last_voted_fork_slots_aggregate::LastVotedForkSlotsAggregate,
solana::wen_restart_proto::LastVotedForkSlotsRecord,
},
solana_gossip::restart_crds_values::RestartLastVotedForkSlots,
solana_program::{clock::Slot, pubkey::Pubkey},
solana_runtime::{
bank::Bank,
genesis_utils::{
create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
},
},
solana_sdk::{hash::Hash, signature::Signer, timing::timestamp},
};
const TOTAL_VALIDATOR_COUNT: u16 = 10;
const MY_INDEX: usize = 9;
const REPAIR_THRESHOLD: f64 = 0.42;
const SHRED_VERSION: u16 = 52;
struct TestAggregateInitResult {
pub slots_aggregate: LastVotedForkSlotsAggregate,
pub validator_voting_keypairs: Vec<ValidatorVoteKeypairs>,
pub root_slot: Slot,
pub last_voted_fork_slots: Vec<Slot>,
}
fn test_aggregate_init() -> TestAggregateInitResult {
solana_logger::setup();
let validator_voting_keypairs: Vec<_> = (0..TOTAL_VALIDATOR_COUNT)
.map(|_| ValidatorVoteKeypairs::new_rand())
.collect();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
10_000,
&validator_voting_keypairs,
vec![100; validator_voting_keypairs.len()],
);
let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config);
let root_bank = bank_forks.read().unwrap().root_bank();
let root_slot = root_bank.slot();
let last_voted_fork_slots = vec![
root_slot.saturating_add(1),
root_slot.saturating_add(2),
root_slot.saturating_add(3),
];
TestAggregateInitResult {
slots_aggregate: LastVotedForkSlotsAggregate::new(
root_slot,
REPAIR_THRESHOLD,
root_bank.epoch_stakes(root_bank.epoch()).unwrap(),
&last_voted_fork_slots,
&validator_voting_keypairs[MY_INDEX].node_keypair.pubkey(),
),
validator_voting_keypairs,
root_slot,
last_voted_fork_slots,
}
}
#[test]
fn test_aggregate() {
let mut test_state = test_aggregate_init();
let root_slot = test_state.root_slot;
let initial_num_active_validators = 3;
for validator_voting_keypair in test_state
.validator_voting_keypairs
.iter()
.take(initial_num_active_validators)
{
let pubkey = validator_voting_keypair.node_keypair.pubkey();
let now = timestamp();
assert_eq!(
test_state.slots_aggregate.aggregate(
RestartLastVotedForkSlots::new(
pubkey,
now,
&test_state.last_voted_fork_slots,
Hash::default(),
SHRED_VERSION,
)
.unwrap(),
),
Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: Hash::default().to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: now,
}),
);
}
let result = test_state.slots_aggregate.get_aggregate_result();
let mut expected_active_percent =
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
assert!(result.slots_to_repair.is_empty());
let new_active_validator = test_state.validator_voting_keypairs
[initial_num_active_validators + 1]
.node_keypair
.pubkey();
let now = timestamp();
let new_active_validator_last_voted_slots = RestartLastVotedForkSlots::new(
new_active_validator,
now,
&test_state.last_voted_fork_slots,
Hash::default(),
SHRED_VERSION,
)
.unwrap();
assert_eq!(
test_state
.slots_aggregate
.aggregate(new_active_validator_last_voted_slots),
Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: Hash::default().to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
expected_active_percent =
(initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
actual_slots.sort();
assert_eq!(actual_slots, test_state.last_voted_fork_slots);
let replace_message_validator = test_state.validator_voting_keypairs[2]
.node_keypair
.pubkey();
// Allow specific validator to replace message.
let now = timestamp();
let replace_message_validator_last_fork = RestartLastVotedForkSlots::new(
replace_message_validator,
now,
&[root_slot + 1, root_slot + 4, root_slot + 5],
Hash::default(),
SHRED_VERSION,
)
.unwrap();
assert_eq!(
test_state
.slots_aggregate
.aggregate(replace_message_validator_last_fork),
Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: vec![root_slot + 1, root_slot + 4, root_slot + 5],
last_vote_bankhash: Hash::default().to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
// test that zero stake validator is ignored.
let random_pubkey = Pubkey::new_unique();
assert_eq!(
test_state.slots_aggregate.aggregate(
RestartLastVotedForkSlots::new(
random_pubkey,
timestamp(),
&[root_slot + 1, root_slot + 4, root_slot + 5],
Hash::default(),
SHRED_VERSION,
)
.unwrap(),
),
None,
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
}
#[test]
fn test_aggregate_from_record() {
let mut test_state = test_aggregate_init();
let root_slot = test_state.root_slot;
let last_vote_bankhash = Hash::new_unique();
let time1 = timestamp();
let record = LastVotedForkSlotsRecord {
wallclock: time1,
last_voted_fork_slots: test_state.last_voted_fork_slots.clone(),
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
};
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 10.0);
assert_eq!(
test_state
.slots_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&record,
)
.unwrap(),
Some(record.clone()),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
// Now if you get the same result from Gossip again, it should be ignored.
assert_eq!(
test_state.slots_aggregate.aggregate(
RestartLastVotedForkSlots::new(
test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey(),
time1,
&test_state.last_voted_fork_slots,
last_vote_bankhash,
SHRED_VERSION,
)
.unwrap(),
),
None,
);
// But if it's a new record from the same validator, it will be replaced.
let time2 = timestamp();
let last_voted_fork_slots2 =
vec![root_slot + 1, root_slot + 2, root_slot + 3, root_slot + 4];
let last_vote_bankhash2 = Hash::new_unique();
assert_eq!(
test_state.slots_aggregate.aggregate(
RestartLastVotedForkSlots::new(
test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey(),
time2,
&last_voted_fork_slots2,
last_vote_bankhash2,
SHRED_VERSION,
)
.unwrap(),
),
Some(LastVotedForkSlotsRecord {
wallclock: time2,
last_voted_fork_slots: last_voted_fork_slots2.clone(),
last_vote_bankhash: last_vote_bankhash2.to_string(),
shred_version: SHRED_VERSION as u32,
}),
);
// percentage doesn't change since it's a replace.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
// Record from validator with zero stake should be ignored.
assert_eq!(
test_state
.slots_aggregate
.aggregate_from_record(
&Pubkey::new_unique().to_string(),
&LastVotedForkSlotsRecord {
wallclock: timestamp(),
last_voted_fork_slots: vec![root_slot + 10, root_slot + 300],
last_vote_bankhash: Hash::new_unique().to_string(),
shred_version: SHRED_VERSION as u32,
}
)
.unwrap(),
None,
);
// percentage doesn't change since the previous aggregate is ignored.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
}
#[test]
fn test_aggregate_from_record_failures() {
solana_logger::setup();
let mut test_state = test_aggregate_init();
let last_vote_bankhash = Hash::new_unique();
let mut last_voted_fork_slots_record = LastVotedForkSlotsRecord {
wallclock: timestamp(),
last_voted_fork_slots: test_state.last_voted_fork_slots,
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
};
// First test that this is a valid record.
assert_eq!(
test_state
.slots_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&last_voted_fork_slots_record,
)
.unwrap(),
Some(last_voted_fork_slots_record.clone()),
);
// Then test that it fails if the record is invalid.
// Invalid pubkey.
assert!(test_state
.slots_aggregate
.aggregate_from_record("invalid_pubkey", &last_voted_fork_slots_record,)
.is_err());
// Invalid hash.
last_voted_fork_slots_record.last_vote_bankhash.clear();
assert!(test_state
.slots_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&last_voted_fork_slots_record,
)
.is_err());
last_voted_fork_slots_record.last_vote_bankhash.pop();
// Empty last voted fork.
last_voted_fork_slots_record.last_vote_bankhash = last_vote_bankhash.to_string();
last_voted_fork_slots_record.last_voted_fork_slots.clear();
assert!(test_state
.slots_aggregate
.aggregate_from_record(
&test_state.validator_voting_keypairs[0]
.node_keypair
.pubkey()
.to_string(),
&last_voted_fork_slots_record,
)
.is_err());
}
}

View File

@ -4,4 +4,5 @@ pub(crate) mod solana {
}
}
pub(crate) mod last_voted_fork_slots_aggregate;
pub mod wen_restart;

File diff suppressed because it is too large Load Diff