wen-restart: Find heaviest fork (#183)

* Pass the final result of LastVotedForkSlots aggregation to next
stage and find the heaviest fork we will Gossip to others.

* Change comments.

* Small fixes to address PR comments.

* Move correctness proof to SIMD.

* Fix a broken merge.

* Use blockstore to check parent slot of any block in FindHeaviestFork

* Change error message.

* Add special message when first slot in the list doesn't link to root.
This commit is contained in:
Wen 2024-03-20 19:38:46 -07:00 committed by GitHub
parent f194f70e68
commit 11aa06d24f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 476 additions and 71 deletions

View File

@ -20,10 +20,23 @@ message LastVotedForkSlotsRecord {
message LastVotedForkSlotsAggregateRecord {
map<string, LastVotedForkSlotsRecord> received = 1;
optional LastVotedForkSlotsAggregateFinal final_result = 2;
}
message LastVotedForkSlotsAggregateFinal {
map<uint64, uint64> slots_stake_map = 1;
uint64 total_active_stake = 2;
}
message HeaviestFork {
uint64 slot = 1;
string bankhash = 2;
uint64 total_active_stake = 3;
}
message WenRestartProgress {
State state = 1;
optional LastVotedForkSlotsRecord my_last_voted_fork_slots = 2;
optional LastVotedForkSlotsAggregateRecord last_voted_fork_slots_aggregate = 3;
optional HeaviestFork my_heaviest_fork = 4;
}

View File

@ -22,6 +22,12 @@ pub struct LastVotedForkSlotsAggregate {
slots_to_repair: HashSet<Slot>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct LastVotedForkSlotsFinalResult {
pub slots_stake_map: HashMap<Slot, u64>,
pub total_active_stake: u64,
}
impl LastVotedForkSlotsAggregate {
pub(crate) fn new(
root_slot: Slot,
@ -35,7 +41,7 @@ impl LastVotedForkSlotsAggregate {
active_peers.insert(*my_pubkey);
let mut slots_stake_map = HashMap::new();
for slot in last_voted_fork_slots {
if slot > &root_slot {
if slot >= &root_slot {
slots_stake_map.insert(*slot, sender_stake);
}
}
@ -137,6 +143,21 @@ impl LastVotedForkSlotsAggregate {
pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator<Item = &Slot> {
self.slots_to_repair.iter()
}
// TODO(wen): use better epoch stake and add a test later.
fn total_active_stake(&self) -> u64 {
self.active_peers.iter().fold(0, |sum: u64, pubkey| {
sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey))
})
}
pub(crate) fn get_final_result(self) -> LastVotedForkSlotsFinalResult {
let total_active_stake = self.total_active_stake();
LastVotedForkSlotsFinalResult {
slots_stake_map: self.slots_stake_map,
total_active_stake,
}
}
}
#[cfg(test)]

View File

@ -2,10 +2,13 @@
use {
crate::{
last_voted_fork_slots_aggregate::LastVotedForkSlotsAggregate,
last_voted_fork_slots_aggregate::{
LastVotedForkSlotsAggregate, LastVotedForkSlotsFinalResult,
},
solana::wen_restart_proto::{
self, LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsRecord,
State as RestartState, WenRestartProgress,
self, HeaviestFork, LastVotedForkSlotsAggregateFinal,
LastVotedForkSlotsAggregateRecord, LastVotedForkSlotsRecord, State as RestartState,
WenRestartProgress,
},
},
anyhow::Result,
@ -37,9 +40,15 @@ use {
// If >42% of the validators have this block, repair this block locally.
const REPAIR_THRESHOLD: f64 = 0.42;
// When counting Heaviest Fork, only count those with no less than
// 67% - 5% - (100% - active_stake) = active_stake - 38% stake.
const HEAVIEST_FORK_THRESHOLD_DELTA: f64 = 0.38;
#[derive(Debug, PartialEq)]
pub enum WenRestartError {
BlockNotFound(Slot),
BlockNotLinkedToExpectedParent(Slot, Option<Slot>, Slot),
ChildStakeLargerThanParent(Slot, u64, Slot, u64),
Exiting,
InvalidLastVoteType(VoteTransaction),
MalformedLastVotedForkSlotsProtobuf(Option<LastVotedForkSlotsRecord>),
@ -50,6 +59,28 @@ pub enum WenRestartError {
impl std::fmt::Display for WenRestartError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WenRestartError::BlockNotFound(slot) => {
write!(f, "Block not found: {}", slot)
}
WenRestartError::BlockNotLinkedToExpectedParent(slot, parent, expected_parent) => {
write!(
f,
"Block {} is not linked to expected parent {} but to {:?}",
slot, expected_parent, parent
)
}
WenRestartError::ChildStakeLargerThanParent(
slot,
child_stake,
parent,
parent_stake,
) => {
write!(
f,
"Block {} has more stake {} than its parent {} with stake {}",
slot, child_stake, parent, parent_stake
)
}
WenRestartError::Exiting => write!(f, "Exiting"),
WenRestartError::InvalidLastVoteType(vote) => {
write!(f, "Invalid last vote type: {:?}", vote)
@ -80,6 +111,11 @@ pub(crate) enum WenRestartProgressInternalState {
},
LastVotedForkSlots {
last_voted_fork_slots: Vec<Slot>,
aggregate_final_result: Option<LastVotedForkSlotsFinalResult>,
},
FindHeaviestFork {
aggregate_final_result: LastVotedForkSlotsFinalResult,
my_heaviest_fork: Option<HeaviestFork>,
},
Done,
}
@ -108,7 +144,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
wen_restart_repair_slots: Arc<RwLock<Vec<Slot>>>,
exit: Arc<AtomicBool>,
progress: &mut WenRestartProgress,
) -> Result<()> {
) -> Result<LastVotedForkSlotsFinalResult> {
let root_bank;
{
root_bank = bank_forks.read().unwrap().root_bank().clone();
@ -132,6 +168,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
} else {
progress.last_voted_fork_slots_aggregate = Some(LastVotedForkSlotsAggregateRecord {
received: HashMap::new(),
final_result: None,
});
}
let mut cursor = solana_gossip::crds::Cursor::default();
@ -198,7 +235,64 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
sleep(Duration::from_millis(time_left));
}
}
Ok(())
Ok(last_voted_fork_slots_aggregate.get_final_result())
}
// Verify that all blocks with at least (active_stake_percnet - 38%) of the stake form a
// single chain from the root, and use the highest slot in the blocks as the heaviest fork.
// Please see SIMD 46 "gossip current heaviest fork" for correctness proof.
pub(crate) fn find_heaviest_fork(
aggregate_final_result: LastVotedForkSlotsFinalResult,
bank_forks: Arc<RwLock<BankForks>>,
blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>,
) -> Result<(Slot, Hash)> {
// Because everything else is stopped, it's okay to grab a big lock on bank_forks.
let my_bank_forks = bank_forks.read().unwrap();
let root_bank = my_bank_forks.root_bank().clone();
let root_slot = root_bank.slot();
// TODO: Should use better epoch_stakes later.
let epoch_stake = root_bank.epoch_stakes(root_bank.epoch()).unwrap();
let total_stake = epoch_stake.total_stake();
let stake_threshold = aggregate_final_result
.total_active_stake
.saturating_sub((HEAVIEST_FORK_THRESHOLD_DELTA * total_stake as f64) as u64);
let mut slots = aggregate_final_result
.slots_stake_map
.iter()
.filter(|(slot, stake)| **slot > root_slot && **stake > stake_threshold)
.map(|(slot, _)| *slot)
.collect::<Vec<Slot>>();
slots.sort();
let mut expected_parent = root_slot;
for slot in slots {
if exit.load(Ordering::Relaxed) {
return Err(WenRestartError::Exiting.into());
}
if let Ok(Some(block_meta)) = blockstore.meta(slot) {
if block_meta.parent_slot != Some(expected_parent) {
if expected_parent == root_slot {
error!("First block {} in repair list not linked to local root {}, this could mean our root is too old",
slot, root_slot);
} else {
error!(
"Block {} in blockstore is not linked to expected parent from Wen Restart {} but to Block {:?}",
slot, expected_parent, block_meta.parent_slot
);
}
return Err(WenRestartError::BlockNotLinkedToExpectedParent(
slot,
block_meta.parent_slot,
expected_parent,
)
.into());
}
expected_parent = slot;
} else {
return Err(WenRestartError::BlockNotFound(slot).into());
}
}
Ok((expected_parent, Hash::default()))
}
pub fn wait_for_wen_restart(
@ -214,30 +308,74 @@ pub fn wait_for_wen_restart(
let (mut state, mut progress) =
initialize(wen_restart_path, last_vote.clone(), blockstore.clone())?;
loop {
match &state {
state = match state {
WenRestartProgressInternalState::Init {
last_voted_fork_slots,
last_vote_bankhash,
} => {
progress.my_last_voted_fork_slots = Some(send_restart_last_voted_fork_slots(
cluster_info.clone(),
&last_voted_fork_slots,
last_vote_bankhash,
)?);
WenRestartProgressInternalState::Init {
last_voted_fork_slots,
*last_vote_bankhash,
)?)
last_vote_bankhash,
}
}
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots,
} => aggregate_restart_last_voted_fork_slots(
wen_restart_path,
wait_for_supermajority_threshold_percent,
cluster_info.clone(),
last_voted_fork_slots,
bank_forks.clone(),
blockstore.clone(),
wen_restart_repair_slots.clone().unwrap(),
exit.clone(),
&mut progress,
)?,
aggregate_final_result,
} => {
let final_result = match aggregate_final_result {
Some(result) => result,
None => aggregate_restart_last_voted_fork_slots(
wen_restart_path,
wait_for_supermajority_threshold_percent,
cluster_info.clone(),
&last_voted_fork_slots,
bank_forks.clone(),
blockstore.clone(),
wen_restart_repair_slots.clone().unwrap(),
exit.clone(),
&mut progress,
)?,
};
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots,
aggregate_final_result: Some(final_result),
}
}
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result,
my_heaviest_fork,
} => {
let heaviest_fork = match my_heaviest_fork {
Some(heaviest_fork) => heaviest_fork,
None => {
let total_active_stake = aggregate_final_result.total_active_stake;
let (slot, bankhash) = find_heaviest_fork(
aggregate_final_result.clone(),
bank_forks.clone(),
blockstore.clone(),
exit.clone(),
)?;
info!(
"Heaviest fork found: slot: {}, bankhash: {}",
slot, bankhash
);
HeaviestFork {
slot,
bankhash: bankhash.to_string(),
total_active_stake,
}
}
};
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result,
my_heaviest_fork: Some(heaviest_fork),
}
}
WenRestartProgressInternalState::Done => return Ok(()),
};
state = increment_and_write_wen_restart_records(wen_restart_path, state, &mut progress)?;
@ -257,13 +395,42 @@ pub(crate) fn increment_and_write_wen_restart_records(
progress.set_state(RestartState::LastVotedForkSlots);
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots,
aggregate_final_result: None,
}
}
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots: _,
aggregate_final_result,
} => {
progress.set_state(RestartState::Done);
WenRestartProgressInternalState::Done
if let Some(aggregate_final_result) = aggregate_final_result {
progress.set_state(RestartState::HeaviestFork);
if let Some(aggregate_record) = progress.last_voted_fork_slots_aggregate.as_mut() {
aggregate_record.final_result = Some(LastVotedForkSlotsAggregateFinal {
slots_stake_map: aggregate_final_result.slots_stake_map.clone(),
total_active_stake: aggregate_final_result.total_active_stake,
});
}
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result,
my_heaviest_fork: None,
}
} else {
return Err(
WenRestartError::UnexpectedState(RestartState::LastVotedForkSlots).into(),
);
}
}
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result: _,
my_heaviest_fork,
} => {
if let Some(my_heaviest_fork) = my_heaviest_fork {
progress.set_state(RestartState::Done);
progress.my_heaviest_fork = Some(my_heaviest_fork.clone());
WenRestartProgressInternalState::Done
} else {
return Err(WenRestartError::UnexpectedState(RestartState::HeaviestFork).into());
}
}
WenRestartProgressInternalState::Done => {
return Err(WenRestartError::UnexpectedState(RestartState::Done).into())
@ -289,8 +456,7 @@ pub(crate) fn initialize(
);
let progress = WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
};
write_wen_restart_records(records_path, &progress)?;
progress
@ -346,6 +512,17 @@ pub(crate) fn initialize(
Ok((
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots: record.last_voted_fork_slots.clone(),
aggregate_final_result: progress
.last_voted_fork_slots_aggregate
.as_ref()
.and_then(|r| {
r.final_result.as_ref().map(|result| {
LastVotedForkSlotsFinalResult {
slots_stake_map: result.slots_stake_map.clone(),
total_active_stake: result.total_active_stake,
}
})
}),
},
progress,
))
@ -353,6 +530,24 @@ pub(crate) fn initialize(
Err(WenRestartError::MalformedLastVotedForkSlotsProtobuf(None).into())
}
}
RestartState::HeaviestFork => Ok((
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result: progress
.last_voted_fork_slots_aggregate
.as_ref()
.and_then(|r| {
r.final_result
.as_ref()
.map(|result| LastVotedForkSlotsFinalResult {
slots_stake_map: result.slots_stake_map.clone(),
total_active_stake: result.total_active_stake,
})
})
.unwrap(),
my_heaviest_fork: progress.my_heaviest_fork.clone(),
},
progress,
)),
_ => Err(WenRestartError::UnexpectedState(progress.state()).into()),
}
}
@ -380,7 +575,7 @@ pub(crate) fn write_wen_restart_records(
#[cfg(test)]
mod tests {
use {
crate::wen_restart::*,
crate::wen_restart::{tests::wen_restart_proto::LastVotedForkSlotsAggregateFinal, *},
assert_matches::assert_matches,
solana_gossip::{
cluster_info::ClusterInfo,
@ -419,7 +614,7 @@ mod tests {
fn push_restart_last_voted_fork_slots(
cluster_info: Arc<ClusterInfo>,
node: &LegacyContactInfo,
expected_slots_to_repair: &[Slot],
last_voted_fork_slots: &[Slot],
last_vote_hash: &Hash,
node_keypair: &Keypair,
wallclock: u64,
@ -427,7 +622,7 @@ mod tests {
let slots = RestartLastVotedForkSlots::new(
*node.pubkey(),
wallclock,
expected_slots_to_repair,
last_voted_fork_slots,
*last_vote_hash,
SHRED_VERSION,
)
@ -523,8 +718,7 @@ mod tests {
let start = timestamp();
let mut progress = WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
};
loop {
if let Ok(new_progress) = read_wen_restart_records(&wen_restart_proto_path) {
@ -540,6 +734,14 @@ mod tests {
}
}
if timestamp().saturating_sub(start) > WAIT_FOR_THREAD_TIMEOUT {
assert_eq!(
progress.my_last_voted_fork_slots,
expected_progress.my_last_voted_fork_slots
);
assert_eq!(
progress.last_voted_fork_slots_aggregate,
expected_progress.last_voted_fork_slots_aggregate
);
panic!(
"wait_on_expected_progress_with_timeout failed to get expected progress {:?} expected {:?}",
&progress,
@ -586,6 +788,7 @@ mod tests {
#[test]
fn test_wen_restart_normal_flow() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let wen_restart_repair_slots = Some(Arc::new(RwLock::new(Vec::new())));
let test_state = wen_restart_test_init(&ledger_path);
@ -616,6 +819,9 @@ mod tests {
let mut rng = rand::thread_rng();
let mut expected_messages = HashMap::new();
// Skip the first 2 validators, because 0 is myself, we only need 8 more to reach > 80%.
let mut last_voted_fork_slots_from_others = test_state.last_voted_fork_slots.clone();
last_voted_fork_slots_from_others.reverse();
last_voted_fork_slots_from_others.append(&mut expected_slots_to_repair.clone());
for keypairs in test_state.validator_voting_keypairs.iter().skip(2) {
let node_pubkey = keypairs.node_keypair.pubkey();
let node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey));
@ -624,7 +830,7 @@ mod tests {
push_restart_last_voted_fork_slots(
test_state.cluster_info.clone(),
&node,
&expected_slots_to_repair,
&last_voted_fork_slots_from_others,
&last_vote_hash,
&keypairs.node_keypair,
now,
@ -632,7 +838,7 @@ mod tests {
expected_messages.insert(
node_pubkey.to_string(),
LastVotedForkSlotsRecord {
last_voted_fork_slots: expected_slots_to_repair.clone(),
last_voted_fork_slots: last_voted_fork_slots_from_others.clone(),
last_vote_bankhash: last_vote_hash.to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: now,
@ -654,6 +860,14 @@ mod tests {
.as_ref()
.unwrap()
.wallclock;
let mut expected_slots_stake_map: HashMap<Slot, u64> = test_state
.last_voted_fork_slots
.iter()
.map(|slot| (*slot, 900))
.collect();
expected_slots_stake_map.extend(expected_slots_to_repair.iter().map(|slot| (*slot, 800)));
let expected_heaviest_fork_slot = last_vote_slot + 2;
let expected_heaviest_fork_bankhash = Hash::default();
assert_eq!(
progress,
WenRestartProgress {
@ -665,10 +879,19 @@ mod tests {
wallclock: progress_start_time,
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: expected_messages
received: expected_messages,
final_result: Some(LastVotedForkSlotsAggregateFinal {
slots_stake_map: expected_slots_stake_map,
total_active_stake: 900,
}),
}),
my_heaviest_fork: Some(HeaviestFork {
slot: expected_heaviest_fork_slot,
bankhash: expected_heaviest_fork_bankhash.to_string(),
total_active_stake: 900
}),
}
)
);
}
fn change_proto_file_readonly(wen_restart_proto_path: &PathBuf, readonly: bool) {
@ -734,8 +957,7 @@ mod tests {
assert_eq!(bankhash, last_vote_bankhash);
assert_eq!(progress, WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
});
}
);
@ -743,8 +965,7 @@ mod tests {
&test_state.wen_restart_proto_path,
&WenRestartProgress {
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
},
);
assert_eq!(
@ -762,8 +983,7 @@ mod tests {
&test_state.wen_restart_proto_path,
&WenRestartProgress {
state: RestartState::WaitingForSupermajority.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
},
);
assert_eq!(
@ -785,8 +1005,7 @@ mod tests {
let test_state = wen_restart_test_init(&ledger_path);
let progress = wen_restart_proto::WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
};
let original_progress = progress.clone();
assert_eq!(
@ -816,7 +1035,9 @@ mod tests {
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: HashMap::new(),
final_result: None,
}),
..Default::default()
},
);
}
@ -827,8 +1048,7 @@ mod tests {
let test_state = wen_restart_test_init(&ledger_path);
let progress = wen_restart_proto::WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: None,
last_voted_fork_slots_aggregate: None,
..Default::default()
};
assert!(write_wen_restart_records(&test_state.wen_restart_proto_path, &progress).is_ok());
change_proto_file_readonly(&test_state.wen_restart_proto_path, true);
@ -857,13 +1077,15 @@ mod tests {
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: HashMap::new(),
final_result: None,
}),
..Default::default()
},
);
}
#[test]
fn test_wen_restart_aggregate_last_voted_fork_failures() {
fn test_wen_restart_aggregate_last_voted_fork_stop_and_restart() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let test_state = wen_restart_test_init(&ledger_path);
@ -881,8 +1103,10 @@ mod tests {
wallclock: start_time,
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: HashMap::new()
received: HashMap::new(),
final_result: None,
}),
..Default::default()
}
)
.is_ok());
@ -890,6 +1114,9 @@ mod tests {
let mut expected_messages = HashMap::new();
let expected_slots_to_repair: Vec<Slot> =
(last_vote_slot + 1..last_vote_slot + 3).collect();
let mut last_voted_fork_slots_from_others = test_state.last_voted_fork_slots.clone();
last_voted_fork_slots_from_others.reverse();
last_voted_fork_slots_from_others.append(&mut expected_slots_to_repair.clone());
// Skip the first 2 validators, because 0 is myself, we need 8 so it hits 80%.
assert_eq!(test_state.validator_voting_keypairs.len(), 10);
let progress = WenRestartProgress {
@ -900,7 +1127,7 @@ mod tests {
shred_version: SHRED_VERSION as u32,
wallclock: start_time,
}),
last_voted_fork_slots_aggregate: None,
..Default::default()
};
for keypairs in test_state.validator_voting_keypairs.iter().skip(2) {
let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone();
@ -934,7 +1161,7 @@ mod tests {
push_restart_last_voted_fork_slots(
test_state.cluster_info.clone(),
&node,
&expected_slots_to_repair,
&last_voted_fork_slots_from_others,
&last_vote_hash,
&keypairs.node_keypair,
now,
@ -942,13 +1169,12 @@ mod tests {
expected_messages.insert(
node_pubkey.to_string(),
LastVotedForkSlotsRecord {
last_voted_fork_slots: expected_slots_to_repair.clone(),
last_voted_fork_slots: last_voted_fork_slots_from_others.clone(),
last_vote_bankhash: last_vote_hash.to_string(),
shred_version: SHRED_VERSION as u32,
wallclock: now,
},
);
// Wait for the newly pushed message to be in written proto file.
wait_on_expected_progress_with_timeout(
test_state.wen_restart_proto_path.clone(),
WenRestartProgress {
@ -961,7 +1187,9 @@ mod tests {
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: expected_messages.clone(),
final_result: None,
}),
..Default::default()
},
);
exit.store(true, Ordering::Relaxed);
@ -980,7 +1208,7 @@ mod tests {
test_state,
last_vote_bankhash,
WenRestartProgress {
state: RestartState::Done.into(),
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: Some(LastVotedForkSlotsRecord {
last_voted_fork_slots,
last_vote_bankhash: last_vote_bankhash.to_string(),
@ -989,7 +1217,9 @@ mod tests {
}),
last_voted_fork_slots_aggregate: Some(LastVotedForkSlotsAggregateRecord {
received: expected_messages,
final_result: None,
}),
..Default::default()
},
);
}
@ -1001,56 +1231,197 @@ mod tests {
let mut wen_restart_proto_path = my_dir.path().to_path_buf();
wen_restart_proto_path.push("wen_restart_status.proto");
let last_vote_bankhash = Hash::new_unique();
let mut state = WenRestartProgressInternalState::Init {
last_voted_fork_slots: vec![0, 1],
last_vote_bankhash,
};
let my_last_voted_fork_slots = Some(LastVotedForkSlotsRecord {
last_voted_fork_slots: vec![0, 1],
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: 0,
wallclock: 0,
});
let mut progress = WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: None,
};
for (expected_state, expected_progress) in [
let last_voted_fork_slots_aggregate = Some(LastVotedForkSlotsAggregateRecord {
received: HashMap::new(),
final_result: Some(LastVotedForkSlotsAggregateFinal {
slots_stake_map: vec![(0, 900), (1, 800)].into_iter().collect(),
total_active_stake: 900,
}),
});
let expected_slots_stake_map: HashMap<Slot, u64> =
vec![(0, 900), (1, 800)].into_iter().collect();
for (entrance_state, exit_state, entrance_progress, exit_progress) in [
(
WenRestartProgressInternalState::Init {
last_voted_fork_slots: vec![0, 1],
last_vote_bankhash,
},
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots: vec![0, 1],
aggregate_final_result: None,
},
WenRestartProgress {
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: None,
..Default::default()
},
WenRestartProgress {
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
..Default::default()
},
),
(
WenRestartProgressInternalState::LastVotedForkSlots {
last_voted_fork_slots: vec![0, 1],
aggregate_final_result: Some(LastVotedForkSlotsFinalResult {
slots_stake_map: expected_slots_stake_map.clone(),
total_active_stake: 900,
}),
},
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result: LastVotedForkSlotsFinalResult {
slots_stake_map: expected_slots_stake_map.clone(),
total_active_stake: 900,
},
my_heaviest_fork: None,
},
WenRestartProgress {
state: RestartState::LastVotedForkSlots.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(),
..Default::default()
},
WenRestartProgress {
state: RestartState::HeaviestFork.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(),
..Default::default()
},
),
(
WenRestartProgressInternalState::FindHeaviestFork {
aggregate_final_result: LastVotedForkSlotsFinalResult {
slots_stake_map: expected_slots_stake_map,
total_active_stake: 900,
},
my_heaviest_fork: Some(HeaviestFork {
slot: 1,
bankhash: Hash::default().to_string(),
total_active_stake: 900,
}),
},
WenRestartProgressInternalState::Done,
WenRestartProgress {
state: RestartState::HeaviestFork.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(),
..Default::default()
},
WenRestartProgress {
state: RestartState::Done.into(),
my_last_voted_fork_slots,
last_voted_fork_slots_aggregate: None,
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(),
my_heaviest_fork: Some(HeaviestFork {
slot: 1,
bankhash: Hash::default().to_string(),
total_active_stake: 900,
}),
},
),
] {
state = increment_and_write_wen_restart_records(
let mut progress = entrance_progress;
let state = increment_and_write_wen_restart_records(
&wen_restart_proto_path,
state,
entrance_state,
&mut progress,
)
.unwrap();
assert_eq!(&state, &expected_state);
assert_eq!(&progress, &expected_progress);
assert_eq!(&state, &exit_state);
assert_eq!(&progress, &exit_progress);
}
let mut progress = WenRestartProgress {
state: RestartState::Done.into(),
my_last_voted_fork_slots: my_last_voted_fork_slots.clone(),
last_voted_fork_slots_aggregate: last_voted_fork_slots_aggregate.clone(),
..Default::default()
};
assert_eq!(
increment_and_write_wen_restart_records(&wen_restart_proto_path, state, &mut progress)
.unwrap_err()
.downcast::<WenRestartError>()
.unwrap(),
increment_and_write_wen_restart_records(
&wen_restart_proto_path,
WenRestartProgressInternalState::Done,
&mut progress
)
.unwrap_err()
.downcast::<WenRestartError>()
.unwrap(),
WenRestartError::UnexpectedState(RestartState::Done),
);
}
#[test]
fn test_find_heaviest_fork_failures() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let exit = Arc::new(AtomicBool::new(false));
let test_state = wen_restart_test_init(&ledger_path);
let last_vote_slot = test_state.last_voted_fork_slots[0];
let slot_with_no_block = last_vote_slot + 5;
// This fails because corresponding block is not found, which is wrong, we should have
// repaired all eligible blocks when we exit LastVotedForkSlots state.
assert_eq!(
find_heaviest_fork(
LastVotedForkSlotsFinalResult {
slots_stake_map: vec![(0, 900), (slot_with_no_block, 800)]
.into_iter()
.collect(),
total_active_stake: 900,
},
test_state.bank_forks.clone(),
test_state.blockstore.clone(),
exit.clone(),
)
.unwrap_err()
.downcast::<WenRestartError>()
.unwrap(),
WenRestartError::BlockNotFound(slot_with_no_block),
);
// The following fails because we expect to see the first slot in slots_stake_map doesn't chain to local root.
assert_eq!(
find_heaviest_fork(
LastVotedForkSlotsFinalResult {
slots_stake_map: vec![(last_vote_slot, 900)].into_iter().collect(),
total_active_stake: 900,
},
test_state.bank_forks.clone(),
test_state.blockstore.clone(),
exit.clone(),
)
.unwrap_err()
.downcast::<WenRestartError>()
.unwrap(),
WenRestartError::BlockNotLinkedToExpectedParent(
last_vote_slot,
Some(last_vote_slot - 1),
0
),
);
// The following fails because we expect to see the some slot in slots_stake_map doesn't chain to the
// one before it.
assert_eq!(
find_heaviest_fork(
LastVotedForkSlotsFinalResult {
slots_stake_map: vec![(1, 900), (last_vote_slot, 900)].into_iter().collect(),
total_active_stake: 900,
},
test_state.bank_forks.clone(),
test_state.blockstore.clone(),
exit.clone(),
)
.unwrap_err()
.downcast::<WenRestartError>()
.unwrap(),
WenRestartError::BlockNotLinkedToExpectedParent(
last_vote_slot,
Some(last_vote_slot - 1),
1
),
);
}
}