Add ancestor hashes to state machine (#31627)

* Notify replay of pruned duplicate confirmed slots

* Ingest replay signal and run ancestor hashes for pruned

* Forward PDC to ancestor hashes and ingest pruned dumps from ancestor hashes service

* Add local-cluster test
This commit is contained in:
Ashwin Sekar 2023-05-13 02:05:44 -07:00 committed by GitHub
parent 618d8cf2a6
commit ef75f1cb4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1938 additions and 222 deletions

1
Cargo.lock generated
View File

@ -6082,6 +6082,7 @@ dependencies = [
"solana-tpu-client",
"solana-vote-program",
"tempfile",
"trees",
]
[[package]]

File diff suppressed because it is too large Load Diff

View File

@ -219,6 +219,7 @@ pub struct EpochSlotsFrozenState {
epoch_slots_frozen_hash: Hash,
duplicate_confirmed_hash: Option<Hash>,
bank_status: BankStatus,
is_popular_pruned: bool,
}
impl EpochSlotsFrozenState {
pub fn new_from_state(
@ -228,6 +229,7 @@ impl EpochSlotsFrozenState {
fork_choice: &mut HeaviestSubtreeForkChoice,
is_dead: impl Fn() -> bool,
get_hash: impl Fn() -> Option<Hash>,
is_popular_pruned: bool,
) -> Self {
let bank_status = BankStatus::new(is_dead, get_hash);
let duplicate_confirmed_hash = get_duplicate_confirmed_hash_from_state(
@ -240,6 +242,7 @@ impl EpochSlotsFrozenState {
epoch_slots_frozen_hash,
duplicate_confirmed_hash,
bank_status,
is_popular_pruned,
)
}
@ -247,13 +250,19 @@ impl EpochSlotsFrozenState {
epoch_slots_frozen_hash: Hash,
duplicate_confirmed_hash: Option<Hash>,
bank_status: BankStatus,
is_popular_pruned: bool,
) -> Self {
Self {
epoch_slots_frozen_hash,
duplicate_confirmed_hash,
bank_status,
is_popular_pruned,
}
}
fn is_popular_pruned(&self) -> bool {
self.is_popular_pruned
}
}
#[derive(PartialEq, Eq, Debug)]
@ -263,14 +272,18 @@ pub enum SlotStateUpdate {
Dead(DeadState),
Duplicate(DuplicateState),
EpochSlotsFrozen(EpochSlotsFrozenState),
// The fork is pruned but has reached `DUPLICATE_THRESHOLD` from votes aggregated across
// descendants and all versions of the slots on this fork.
PopularPrunedFork,
}
impl SlotStateUpdate {
fn into_state_changes(self, slot: Slot) -> Vec<ResultingStateChange> {
let bank_frozen_hash = self.bank_hash();
if bank_frozen_hash.is_none() {
if bank_frozen_hash.is_none() && !self.is_popular_pruned() {
// If the bank hasn't been frozen yet, then there's nothing to do
// since replay of the slot hasn't finished yet.
// However if the bank is pruned, then replay will never finish so we still process now
return vec![];
}
@ -286,6 +299,7 @@ impl SlotStateUpdate {
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state) => {
on_epoch_slots_frozen(slot, epoch_slots_frozen_state)
}
SlotStateUpdate::PopularPrunedFork => on_popular_pruned_fork(slot),
}
}
@ -300,6 +314,17 @@ impl SlotStateUpdate {
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state) => {
epoch_slots_frozen_state.bank_status.bank_hash()
}
SlotStateUpdate::PopularPrunedFork => None,
}
}
fn is_popular_pruned(&self) -> bool {
match self {
SlotStateUpdate::PopularPrunedFork => true,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state) => {
epoch_slots_frozen_state.is_popular_pruned()
}
_ => false,
}
}
}
@ -344,6 +369,7 @@ fn check_duplicate_confirmed_hash_against_frozen_hash(
slot, duplicate_confirmed_hash, bank_frozen_hash
);
}
state_changes.push(ResultingStateChange::MarkSlotDuplicate(bank_frozen_hash));
state_changes.push(ResultingStateChange::RepairDuplicateConfirmedVersion(
duplicate_confirmed_hash,
@ -364,6 +390,7 @@ fn check_epoch_slots_hash_against_frozen_hash(
epoch_slots_frozen_hash: Hash,
bank_frozen_hash: Hash,
is_dead: bool,
is_popular_pruned: bool,
) {
if epoch_slots_frozen_hash != bank_frozen_hash {
if is_dead {
@ -373,6 +400,12 @@ fn check_epoch_slots_hash_against_frozen_hash(
"EpochSlots sample returned slot {} with hash {}, but we marked slot dead",
slot, epoch_slots_frozen_hash
);
} else if is_popular_pruned {
// The cluster sample found the troublesome slot which caused this fork to be pruned
warn!(
"EpochSlots sample returned slot {slot} with hash {epoch_slots_frozen_hash}, but we
have pruned it due to incorrect ancestry"
);
} else {
// The duplicate confirmed slot hash does not match our frozen hash.
// Modify fork choice rule to exclude our version from being voted
@ -383,7 +416,11 @@ fn check_epoch_slots_hash_against_frozen_hash(
slot, epoch_slots_frozen_hash, bank_frozen_hash
);
}
state_changes.push(ResultingStateChange::MarkSlotDuplicate(bank_frozen_hash));
if !is_popular_pruned {
// If the slot is already pruned, it will already be pruned from fork choice so no
// reason to mark as duplicate
state_changes.push(ResultingStateChange::MarkSlotDuplicate(bank_frozen_hash));
}
state_changes.push(ResultingStateChange::RepairDuplicateConfirmedVersion(
epoch_slots_frozen_hash,
));
@ -420,12 +457,14 @@ fn on_dead_slot(slot: Slot, dead_state: DeadState) -> Vec<ResultingStateChange>
// match arm above.
let bank_hash = Hash::default();
let is_dead = true;
let is_popular_pruned = false;
check_epoch_slots_hash_against_frozen_hash(
&mut state_changes,
slot,
epoch_slots_frozen_hash,
bank_hash,
is_dead,
is_popular_pruned,
);
}
}
@ -466,12 +505,14 @@ fn on_frozen_slot(slot: Slot, bank_frozen_state: BankFrozenState) -> Vec<Resulti
// Lower priority than having seen an actual duplicate confirmed hash in the
// match arm above.
let is_dead = false;
let is_popular_pruned = false;
check_epoch_slots_hash_against_frozen_hash(
&mut state_changes,
slot,
epoch_slots_frozen_hash,
frozen_hash,
is_dead,
is_popular_pruned,
);
}
}
@ -559,28 +600,49 @@ fn on_epoch_slots_frozen(
bank_status,
epoch_slots_frozen_hash,
duplicate_confirmed_hash,
is_popular_pruned,
} = epoch_slots_frozen_state;
if let Some(duplicate_confirmed_hash) = duplicate_confirmed_hash {
if epoch_slots_frozen_hash != duplicate_confirmed_hash {
warn!(
"EpochSlots sample returned slot {} with hash {}, but we already saw
// If `slot` has already been duplicate confirmed, `epoch_slots_frozen` becomes redundant as
// one of the following triggers would have already processed `slot`:
//
// 1) If the bank was replayed and then duplicate confirmed through turbine/gossip, the
// corresponding `SlotStateUpdate::DuplicateConfirmed`
// 2) If the slot was first duplicate confirmed through gossip and then replayed, the
// corresponding `SlotStateUpdate::BankFrozen` or `SlotStateUpdate::Dead`
//
// However if `slot` was first duplicate confirmed through gossip and then pruned before
// we got a chance to replay, there was no trigger that would have processed `slot`.
// The original `SlotStateUpdate::DuplicateConfirmed` is a no-op when the bank has not been
// replayed yet, and unlike 2) there is no upcoming `SlotStateUpdate::BankFrozen` or
// `SlotStateUpdate::Dead`, as `slot` is pruned and will not be replayed.
//
// Thus if we have a duplicate confirmation, but `slot` is pruned, we continue
// processing it as `epoch_slots_frozen`.
if !is_popular_pruned {
if let Some(duplicate_confirmed_hash) = duplicate_confirmed_hash {
if epoch_slots_frozen_hash != duplicate_confirmed_hash {
warn!(
"EpochSlots sample returned slot {} with hash {}, but we already saw
duplicate confirmation on hash: {:?}",
slot, epoch_slots_frozen_hash, duplicate_confirmed_hash
);
}
return vec![];
}
match bank_status {
BankStatus::Dead | BankStatus::Frozen(_) => (),
// No action to be taken yet
BankStatus::Unprocessed => {
slot, epoch_slots_frozen_hash, duplicate_confirmed_hash
);
}
return vec![];
}
}
let frozen_hash = bank_status.bank_hash().expect("bank hash must exist");
match bank_status {
BankStatus::Dead | BankStatus::Frozen(_) => (),
// No action to be taken yet unless `slot` is pruned in which case it will never be played
BankStatus::Unprocessed => {
if !is_popular_pruned {
return vec![];
}
}
}
let frozen_hash = bank_status.bank_hash().unwrap_or_default();
let is_dead = bank_status.is_dead();
let mut state_changes = vec![];
check_epoch_slots_hash_against_frozen_hash(
@ -589,11 +651,21 @@ fn on_epoch_slots_frozen(
epoch_slots_frozen_hash,
frozen_hash,
is_dead,
is_popular_pruned,
);
state_changes
}
fn on_popular_pruned_fork(slot: Slot) -> Vec<ResultingStateChange> {
warn!("{slot} is part of a pruned fork which has reached the DUPLICATE_THRESHOLD aggregating across descendants
and slot versions. It is suspected to be duplicate or have an ancestor that is duplicate.
Notifying ancestor_hashes_service");
vec![ResultingStateChange::SendAncestorHashesReplayUpdate(
AncestorHashesReplayUpdate::PopularPrunedFork(slot),
)]
}
fn get_cluster_confirmed_hash_from_state(
slot: Slot,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
@ -1252,7 +1324,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let bank_status = BankStatus::Unprocessed;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1262,7 +1334,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Unprocessed;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1272,7 +1344,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash);
let bank_status = BankStatus::Unprocessed;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1282,7 +1354,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let bank_status = BankStatus::Dead;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![
@ -1294,7 +1366,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Dead;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1304,7 +1376,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash);
let bank_status = BankStatus::Dead;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1315,7 +1387,7 @@ mod test {
let duplicate_confirmed_hash = None;
let frozen_hash = Hash::new_unique();
let bank_status = BankStatus::Frozen(frozen_hash);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![
@ -1327,7 +1399,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1337,7 +1409,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Frozen(Hash::new_unique());
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1347,7 +1419,7 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
@ -1357,12 +1429,121 @@ mod test {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Frozen(duplicate_confirmed_hash.unwrap());
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
)
},
epoch_slots_frozen_state_update_11: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let bank_status = BankStatus::Unprocessed;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_12: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Unprocessed;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_13: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash);
let bank_status = BankStatus::Unprocessed;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_14: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let bank_status = BankStatus::Dead;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_15: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Dead;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_16: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash);
let bank_status = BankStatus::Dead;
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_17: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let frozen_hash = Hash::new_unique();
let bank_status = BankStatus::Frozen(frozen_hash);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_18: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = None;
let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
)
},
epoch_slots_frozen_state_update_19: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Frozen(Hash::new_unique());
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)],
)
},
epoch_slots_frozen_state_update_20: {
let epoch_slots_frozen_hash = Hash::new_unique();
let duplicate_confirmed_hash = Some(Hash::new_unique());
let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true);
(
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
Vec::<ResultingStateChange>::new()
)
},
popular_pruned_fork: {
(
SlotStateUpdate::PopularPrunedFork,
vec![ResultingStateChange::SendAncestorHashesReplayUpdate(
AncestorHashesReplayUpdate::PopularPrunedFork(10),
)]
)
},
}
struct InitialState {
@ -2072,6 +2253,7 @@ mod test {
&mut heaviest_subtree_fork_choice,
|| progress.is_dead(3).unwrap_or(false),
|| Some(slot3_hash),
false,
);
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
@ -2165,6 +2347,7 @@ mod test {
&mut heaviest_subtree_fork_choice,
|| progress.is_dead(3).unwrap_or(false),
|| Some(slot3_hash),
false,
);
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();

View File

@ -1,11 +1,24 @@
use {
solana_ledger::blockstore::Blockstore,
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, timing::timestamp},
std::{collections::HashMap, net::SocketAddr},
std::{
collections::HashMap,
net::SocketAddr,
sync::atomic::{AtomicUsize, Ordering},
},
};
// Number of validators to sample for the ancestor repair
pub const ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: usize = 21;
// We use static to enable tests from having to spin up 21 validators
static ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: AtomicUsize = AtomicUsize::new(21);
pub fn get_ancestor_hash_repair_sample_size() -> usize {
ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.load(Ordering::Relaxed)
}
pub fn set_ancestor_hash_repair_sample_size_for_tests_only(sample_size: usize) {
ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.store(sample_size, Ordering::Relaxed);
}
// Even assuming 20% of validators malicious, the chance that >= 11 of the
// ANCESTOR_HASH_REPAIR_SAMPLE_SIZE = 21 validators is malicious is roughly 1/1000.
@ -14,10 +27,12 @@ pub const ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: usize = 21;
// On the other hand with a 52-48 split of validators with one version of the block vs
// another, the chance of >= 11 of the 21 sampled being from the 52% portion is
// about 57%, so we should be able to find a correct sample in a reasonable amount of time.
const MINIMUM_ANCESTOR_AGREEMENT_SIZE: usize = (ANCESTOR_HASH_REPAIR_SAMPLE_SIZE + 1) / 2;
pub fn get_minimum_ancestor_agreement_size() -> usize {
(get_ancestor_hash_repair_sample_size() + 1) / 2
}
const RETRY_INTERVAL_SECONDS: usize = 5;
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum DuplicateAncestorDecision {
InvalidSample,
AncestorsAllMatch,
@ -25,6 +40,7 @@ pub enum DuplicateAncestorDecision {
ContinueSearch(DuplicateSlotRepairStatus),
EarliestAncestorNotFrozen(DuplicateSlotRepairStatus),
EarliestMismatchFound(DuplicateSlotRepairStatus),
EarliestPrunedMismatchFound(DuplicateSlotRepairStatus),
}
impl DuplicateAncestorDecision {
@ -34,13 +50,14 @@ impl DuplicateAncestorDecision {
DuplicateAncestorDecision::InvalidSample
// It may be possible the validators have not yet detected duplicate confirmation
// so retry
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true,
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true,
DuplicateAncestorDecision::AncestorsAllMatch => false,
DuplicateAncestorDecision::ContinueSearch(_status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(_status)
| DuplicateAncestorDecision::EarliestMismatchFound(_status) => false,
| DuplicateAncestorDecision::EarliestMismatchFound(_status)
| DuplicateAncestorDecision::EarliestPrunedMismatchFound(_status) => false,
}
}
@ -49,20 +66,24 @@ impl DuplicateAncestorDecision {
DuplicateAncestorDecision::InvalidSample
| DuplicateAncestorDecision::AncestorsAllMatch
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None,
DuplicateAncestorDecision::ContinueSearch(status) => Some(status),
DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) => Some(status),
DuplicateAncestorDecision::EarliestMismatchFound(status) => Some(status),
DuplicateAncestorDecision::ContinueSearch(status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(status)
| DuplicateAncestorDecision::EarliestMismatchFound(status)
| DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status),
}
}
fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> {
pub fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> {
match self {
DuplicateAncestorDecision::InvalidSample
| DuplicateAncestorDecision::AncestorsAllMatch
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None,
DuplicateAncestorDecision::ContinueSearch(status) => Some(status),
DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) => Some(status),
DuplicateAncestorDecision::EarliestMismatchFound(status) => Some(status),
DuplicateAncestorDecision::ContinueSearch(status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(status)
| DuplicateAncestorDecision::EarliestMismatchFound(status)
| DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status),
}
}
}
@ -94,11 +115,69 @@ impl DuplicateSlotRepairStatus {
}
}
#[derive(Default, Clone, Copy, PartialEq, Eq, Debug)]
pub enum AncestorRequestType {
#[default]
DeadDuplicateConfirmed,
PopularPruned,
}
impl AncestorRequestType {
pub fn is_pruned(&self) -> bool {
matches!(self, Self::PopularPruned)
}
}
pub struct AncestorDuplicateSlotsToRepair {
// Slots that `ancestor_hashes_service` found that need to be repaired
pub slots_to_repair: Vec<(Slot, Hash)>,
// Condition that initiated this request
pub request_type: AncestorRequestType,
}
impl AncestorDuplicateSlotsToRepair {
pub fn is_empty(&self) -> bool {
self.slots_to_repair.is_empty()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct AncestorRequestDecision {
// The slot that initiated this request
pub slot: Slot,
// Condition which initiated this request
pub request_type: AncestorRequestType,
// Decision
pub decision: DuplicateAncestorDecision,
}
impl AncestorRequestDecision {
pub fn slots_to_repair(self) -> Option<AncestorDuplicateSlotsToRepair> {
let Self {
request_type,
mut decision,
..
} = self;
decision
.repair_status_mut()
.map(|status| AncestorDuplicateSlotsToRepair {
slots_to_repair: std::mem::take(&mut status.correct_ancestors_to_repair),
request_type,
})
}
pub fn is_retryable(&self) -> bool {
self.decision.is_retryable()
}
}
#[derive(Default, Clone)]
pub struct AncestorRequestStatus {
// The mismatched slot that was the subject of the AncestorHashes(requested_mismatched_slot)
// repair request. All responses to this request should be for ancestors of this slot.
requested_mismatched_slot: Slot,
// Condition which initiated this request
request_type: AncestorRequestType,
// Timestamp at which we sent out the requests
start_ts: u64,
// The addresses of the validators we asked for a response, a response is only acceptable
@ -119,9 +198,11 @@ impl AncestorRequestStatus {
pub fn new(
sampled_validators: impl Iterator<Item = SocketAddr>,
requested_mismatched_slot: Slot,
request_type: AncestorRequestType,
) -> Self {
AncestorRequestStatus {
requested_mismatched_slot,
request_type,
start_ts: timestamp(),
sampled_validators: sampled_validators.map(|p| (p, false)).collect(),
..AncestorRequestStatus::default()
@ -163,7 +244,7 @@ impl AncestorRequestStatus {
// If we got enough of the sampled validators to respond, we are confident
// this is the correct set of ancestors
if validators_with_same_response.len()
== MINIMUM_ANCESTOR_AGREEMENT_SIZE.min(self.sampled_validators.len())
== get_minimum_ancestor_agreement_size().min(self.sampled_validators.len())
{
// When we reach MINIMUM_ANCESTOR_AGREEMENT_SIZE of the same responses,
// check for mismatches.
@ -175,7 +256,8 @@ impl AncestorRequestStatus {
// If everyone responded and we still haven't agreed upon a set of
// ancestors, that means there was a lot of disagreement and we sampled
// a bad set of validators.
if self.num_responses == ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.min(self.sampled_validators.len())
if self.num_responses
== get_ancestor_hash_repair_sample_size().min(self.sampled_validators.len())
{
info!(
"{} return invalid sample no agreement",
@ -187,6 +269,10 @@ impl AncestorRequestStatus {
None
}
pub fn request_type(&self) -> AncestorRequestType {
self.request_type
}
fn handle_sampled_validators_reached_agreement(
&mut self,
blockstore: &Blockstore,
@ -227,7 +313,6 @@ impl AncestorRequestStatus {
// Responses were not properly ordered
return DuplicateAncestorDecision::InvalidSample;
}
last_ancestor = *ancestor_slot;
if *ancestor_slot > self.requested_mismatched_slot {
// We should only get ancestors of `self.requested_mismatched_slot`
// in valid responses
@ -254,6 +339,31 @@ impl AncestorRequestStatus {
),
));
}
} else if earliest_erroring_ancestor.is_none() && self.request_type.is_pruned() {
// If the slot we are requesting for is pruned, then the slot and many of its
// ancestors may not have a frozen hash (unlike dead slots where all the ancestors
// will have a frozen hash). Thus the best we can do is to compare the slot numbers
// to find the first ancestor that has the wrong parent, or the first missing
// ancestor.
//
// We return the earliest such mismatch.
if let Ok(Some(meta)) = blockstore.meta(*ancestor_slot) {
if i != 0 && meta.parent_slot != Some(last_ancestor) {
earliest_erroring_ancestor = Some((
agreed_response.len() - i - 1,
DuplicateAncestorDecision::EarliestPrunedMismatchFound(
DuplicateSlotRepairStatus::default(),
),
));
}
} else {
earliest_erroring_ancestor = Some((
agreed_response.len() - i - 1,
DuplicateAncestorDecision::EarliestPrunedMismatchFound(
DuplicateSlotRepairStatus::default(),
),
));
}
} else if earliest_erroring_ancestor.is_none() {
// If in our current ledger, `ancestor_slot` is actually on the same fork as
// `self.requested_mismatched_slot`, then the `frozen_hash` should not be None here.
@ -290,9 +400,9 @@ impl AncestorRequestStatus {
// ancestors?
//
// There are two cases:
// 1) The first such mismatch `first_mismatch` appears BEFORE the slot `4` that is
// 1) The first such mismatch `first_mismatch` appears somewhere BEFORE the slot `4` that is
// missing from our blockstore.
// 2) The first such mismatch `first_mismatch` appears AFTER the slot `4` that is
// 2) The first such mismatch `first_mismatch` appears immediately AFTER the slot `4` that is
// missing from our blockstore.
//
// Because we know any mismatches will also trigger the mismatch casing earlier in
@ -312,6 +422,7 @@ impl AncestorRequestStatus {
),
));
}
last_ancestor = *ancestor_slot;
}
if let Some((earliest_erroring_ancestor_index, mut decision)) = earliest_erroring_ancestor {
@ -358,6 +469,7 @@ pub mod tests {
solana_ledger::get_tmp_ledger_path_auto_delete,
std::{collections::BTreeMap, net::IpAddr},
tempfile::TempDir,
trees::tr,
};
struct TestSetup {
@ -374,13 +486,21 @@ pub mod tests {
SocketAddr::new(ip, 8080)
}
fn setup_add_response_test(request_slot: Slot, num_ancestors_in_response: usize) -> TestSetup {
fn setup_add_response_test_with_type(
request_slot: Slot,
num_ancestors_in_response: usize,
request_type: AncestorRequestType,
) -> TestSetup {
assert!(request_slot >= num_ancestors_in_response as u64);
let sampled_addresses: Vec<SocketAddr> = std::iter::repeat_with(create_rand_socket_addr)
.take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE)
.take(get_ancestor_hash_repair_sample_size())
.collect();
let status = AncestorRequestStatus::new(sampled_addresses.iter().cloned(), request_slot);
let status = AncestorRequestStatus::new(
sampled_addresses.iter().cloned(),
request_slot,
request_type,
);
let blockstore_temp_dir = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(blockstore_temp_dir.path()).unwrap();
@ -399,6 +519,25 @@ pub mod tests {
}
}
fn setup_add_response_test(request_slot: Slot, num_ancestors_in_response: usize) -> TestSetup {
setup_add_response_test_with_type(
request_slot,
num_ancestors_in_response,
AncestorRequestType::DeadDuplicateConfirmed,
)
}
fn setup_add_response_test_pruned(
request_slot: Slot,
num_ancestors_in_response: usize,
) -> TestSetup {
setup_add_response_test_with_type(
request_slot,
num_ancestors_in_response,
AncestorRequestType::PopularPruned,
)
}
#[test]
fn test_add_response_invalid_peer() {
let request_slot = 100;
@ -433,7 +572,7 @@ pub mod tests {
incorrect_ancestors_response.pop().unwrap();
// Add a mixture of correct and incorrect responses from the same `responder_addr`.
let num_repeated_responses = ANCESTOR_HASH_REPAIR_SAMPLE_SIZE;
let num_repeated_responses = get_ancestor_hash_repair_sample_size();
let responder_addr = &sampled_addresses[0];
for i in 0..num_repeated_responses {
let response = if i % 2 == 0 {
@ -489,7 +628,7 @@ pub mod tests {
.collect();
let total_incorrect_responses = events.iter().last().map(|(count, _)| *count).unwrap_or(0);
assert!(total_incorrect_responses <= ANCESTOR_HASH_REPAIR_SAMPLE_SIZE);
assert!(total_incorrect_responses <= get_ancestor_hash_repair_sample_size());
let mut event_order: Vec<usize> = (0..sampled_addresses.len()).collect();
event_order.shuffle(&mut thread_rng());
@ -528,7 +667,7 @@ pub mod tests {
let desired_incorrect_responses = vec![
(
incorrect_ancestors_response_0,
MINIMUM_ANCESTOR_AGREEMENT_SIZE - 1,
get_minimum_ancestor_agreement_size() - 1,
),
(incorrect_ancestors_response_1, 2),
];
@ -539,8 +678,8 @@ pub mod tests {
.map(|(_, count)| count)
.sum();
assert!(
ANCESTOR_HASH_REPAIR_SAMPLE_SIZE - total_invalid_responses
< MINIMUM_ANCESTOR_AGREEMENT_SIZE
get_ancestor_hash_repair_sample_size() - total_invalid_responses
< get_minimum_ancestor_agreement_size()
);
assert_eq!(
@ -561,7 +700,7 @@ pub mod tests {
let incorrect_ancestors_response = vec![];
let desired_incorrect_responses = vec![(
incorrect_ancestors_response,
MINIMUM_ANCESTOR_AGREEMENT_SIZE,
get_minimum_ancestor_agreement_size(),
)];
assert_eq!(
@ -582,7 +721,7 @@ pub mod tests {
let incorrect_ancestors_response = vec![(request_slot - 1, Hash::new_unique())];
let desired_incorrect_responses = vec![(
incorrect_ancestors_response,
MINIMUM_ANCESTOR_AGREEMENT_SIZE,
get_minimum_ancestor_agreement_size(),
)];
assert_eq!(
@ -605,7 +744,7 @@ pub mod tests {
incorrect_ancestors_response.push((request_slot + 1, Hash::new_unique()));
let desired_incorrect_responses = vec![(
incorrect_ancestors_response,
MINIMUM_ANCESTOR_AGREEMENT_SIZE,
get_minimum_ancestor_agreement_size(),
)];
assert_eq!(
@ -627,7 +766,7 @@ pub mod tests {
incorrect_ancestors_response.swap_remove(0);
let desired_incorrect_responses = vec![(
incorrect_ancestors_response,
MINIMUM_ANCESTOR_AGREEMENT_SIZE,
get_minimum_ancestor_agreement_size(),
)];
assert_eq!(
@ -657,7 +796,7 @@ pub mod tests {
incorrect_ancestors_response[5].1 = Hash::new_unique();
let desired_incorrect_responses = vec![(
incorrect_ancestors_response,
MINIMUM_ANCESTOR_AGREEMENT_SIZE,
get_minimum_ancestor_agreement_size(),
)];
assert_eq!(
@ -680,7 +819,7 @@ pub mod tests {
incorrect_ancestors_response.push((request_slot, Hash::new_unique()));
let desired_incorrect_responses = vec![(
incorrect_ancestors_response,
MINIMUM_ANCESTOR_AGREEMENT_SIZE - 1,
get_minimum_ancestor_agreement_size() - 1,
)];
// We have no entries in the blockstore, so all the ancestors will be missing
@ -723,12 +862,12 @@ pub mod tests {
// Here we either skip slot 93 or 94.
//
// 1) If we skip slot 93, and insert mismatched slot 94 we're testing the order of
// events `Not frozen -> Mismatched hash`
// events `Not frozen -> Mismatched hash` which should return
// `EarliestAncestorNotFrozen`
//
// 2) If we insert mismatched slot 93, and skip slot 94 we're testing the order of
// events `Mismatched hash -> Not frozen`
//
// Both cases should return `EarliestMismatchFound`
// events `Mismatched hash -> Not frozen`, which should return
// `EarliestMismatchFound`
test_setup
.blockstore
.insert_bank_hash(slot, Hash::new_unique(), false);
@ -794,7 +933,7 @@ pub mod tests {
// Set up a situation where some of our ancestors are correct,
// but then we fork off with different versions of the correct slots.
// ```
// 93' - 94' - 95' - 96' - 97' - 98' - 99' - 100' (our current fork, missing some slots like 98)
// 93' - 94' - 95' - 96' - 97' - 98' - 99' - 100' (our current fork)
// /
// 90 - 91 - 92 (all correct)
// \
@ -850,4 +989,133 @@ pub mod tests {
DuplicateAncestorDecision::AncestorsAllMatch
);
}
#[test]
fn test_add_multiple_responses_pruned_all_mismatch() {
let request_slot = 100;
let mut test_setup = setup_add_response_test_pruned(request_slot, 10);
// We have no entries in the blockstore, so all the ancestors will be missing
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::ContinueSearch(repair_status) => {
assert_eq!(
repair_status.correct_ancestors_to_repair,
test_setup.correct_ancestors_response
);
}
x => panic!("Incorrect decision {x:?}"),
};
}
#[test]
fn test_add_multiple_responses_pruned_all_match() {
let request_slot = 100;
let mut test_setup = setup_add_response_test_pruned(request_slot, 10);
// Insert all the correct ancestory
let tree = test_setup
.correct_ancestors_response
.iter()
.fold(tr(request_slot + 1), |tree, (slot, _)| (tr(*slot) / tree));
test_setup
.blockstore
.add_tree(tree, true, true, 2, Hash::default());
// All the ancestors matched
assert_eq!(
run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup),
DuplicateAncestorDecision::AncestorsAllMatch
);
}
#[test]
fn test_add_multiple_responses_pruned_some_ancestors_missing() {
let request_slot = 100;
let mut test_setup = setup_add_response_test_pruned(request_slot, 10);
// Set up a situation where some of our ancestors are correct,
// but then we fork off and are missing some ancestors like so:
// ```
// 93 - 95 - 97 - 99 - 100 (our current fork, missing some slots like 98)
// /
// 90 - 91 - 92 (all correct)
// \
// 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 (correct fork)
// ```
let tree = test_setup
.correct_ancestors_response
.iter()
.filter(|(slot, _)| *slot <= 92 || *slot % 2 == 1)
.fold(tr(request_slot), |tree, (slot, _)| (tr(*slot) / tree));
test_setup
.blockstore
.add_tree(tree, true, true, 2, Hash::default());
let repair_status =
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) => {
repair_status
}
x => panic!("Incorrect decision {x:?}"),
};
// Expect to find everything after 93 in the `correct_ancestors_to_repair`.
let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup
.correct_ancestors_response
.into_iter()
.filter(|(slot, _)| *slot > 93)
.collect();
assert_eq!(
repair_status.correct_ancestors_to_repair,
expected_mismatched_slots
);
}
#[test]
fn test_add_multiple_responses_pruned_ancestor_is_bad() {
let request_slot = 100;
let mut test_setup = setup_add_response_test_pruned(request_slot, 10);
// Set up the situation we expect to see, exactly 1 duplicate has caused this branch to
// descend from pruned.
// ```
// Our fork view:
// 90 - 91 - 92
// 10 - 11 - 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100
//
// Correct fork:
// 90 - 91 - 92 - 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100
// ```
let root_fork = tr(90) / (tr(91) / tr(92));
let pruned_fork = [10, 11, 93, 94, 95, 96, 97, 98, 99]
.iter()
.rev()
.fold(tr(100), |tree, slot| (tr(*slot) / tree));
test_setup
.blockstore
.add_tree(root_fork, true, true, 2, Hash::default());
test_setup
.blockstore
.add_tree(pruned_fork, true, true, 2, Hash::default());
let repair_status =
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) => {
repair_status
}
x => panic!("Incorrect decision {x:?}"),
};
// Expect to find everything after 92 in the `correct_ancestors_to_repair`.
let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup
.correct_ancestors_response
.into_iter()
.filter(|(slot, _)| *slot >= 93)
.collect();
assert_eq!(
repair_status.correct_ancestors_to_repair,
expected_mismatched_slots
);
}
}

View File

@ -498,6 +498,10 @@ impl HeaviestSubtreeForkChoice {
.map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree))
}
pub fn slots_iter(&self) -> impl Iterator<Item = Slot> + '_ {
self.fork_infos.iter().map(|((slot, _), _)| slot).copied()
}
/// Split off the node at `slot_hash_key` and propagate the stake subtraction up to the root of the
/// tree.
///

View File

@ -10,6 +10,7 @@ use {
ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService},
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
duplicate_repair_status::AncestorDuplicateSlotsToRepair,
outstanding_requests::OutstandingRequests,
repair_weight::RepairWeight,
serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
@ -49,13 +50,15 @@ use {
const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;
pub type DuplicateSlotsResetSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type AncestorDuplicateSlotsSender = CrossbeamSender<AncestorDuplicateSlotsToRepair>;
pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver<AncestorDuplicateSlotsToRepair>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type DumpedSlotsSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DumpedSlotsReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>;
pub type PopularPrunedForksSender = CrossbeamSender<Vec<Slot>>;
pub type PopularPrunedForksReceiver = CrossbeamReceiver<Vec<Slot>>;
#[derive(Default, Debug)]
pub struct SlotRepairs {
@ -197,7 +200,7 @@ pub struct RepairInfo {
pub cluster_info: Arc<ClusterInfo>,
pub cluster_slots: Arc<ClusterSlots>,
pub epoch_schedule: EpochSchedule,
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
pub ancestor_duplicate_slots_sender: AncestorDuplicateSlotsSender,
// Validators from which repairs are requested
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving
@ -224,6 +227,7 @@ pub struct RepairService {
}
impl RepairService {
#[allow(clippy::too_many_arguments)]
pub fn new(
blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>,
@ -234,6 +238,7 @@ impl RepairService {
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
) -> Self {
let t_repair = {
let blockstore = blockstore.clone();
@ -250,6 +255,7 @@ impl RepairService {
verified_vote_receiver,
&outstanding_requests,
dumped_slots_receiver,
popular_pruned_forks_sender,
)
})
.unwrap()
@ -277,6 +283,7 @@ impl RepairService {
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(
@ -290,6 +297,7 @@ impl RepairService {
let mut best_repairs_stats = BestRepairsStats::default();
let mut last_stats = Instant::now();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
let mut popular_pruned_forks_requests = HashSet::new();
loop {
if exit.load(Ordering::Relaxed) {
@ -329,7 +337,12 @@ impl RepairService {
// question would have already been purged in `repair_weight.set_root`
// and there is no chance of it being part of the rooted path.
if slot >= repair_weight.root() {
repair_weight.split_off(slot);
let dumped_slots = repair_weight.split_off(slot);
// Remove from outstanding ancestor hashes requests. Also clean any
// requests that might have been since fixed
popular_pruned_forks_requests.retain(|slot| {
!dumped_slots.contains(slot) && repair_weight.is_pruned(*slot)
});
}
}
});
@ -371,6 +384,32 @@ impl RepairService {
&mut best_repairs_stats,
);
let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks(
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
);
// Check if we've already sent a request along this pruned fork
popular_pruned_forks.retain(|slot| {
if popular_pruned_forks_requests
.iter()
.any(|prev_req_slot| repair_weight.same_tree(*slot, *prev_req_slot))
{
false
} else {
popular_pruned_forks_requests.insert(*slot);
true
}
});
if !popular_pruned_forks.is_empty() {
warn!(
"Notifying repair of popular pruned forks {:?}",
popular_pruned_forks
);
popular_pruned_forks_sender
.send(popular_pruned_forks)
.unwrap_or_else(|err| error!("failed to send popular pruned forks {err}"));
}
repairs
};

View File

@ -4,7 +4,9 @@ use {
repair_generic_traversal::{get_closest_completion, get_unknown_last_index},
repair_service::{BestRepairsStats, RepairTiming},
repair_weighted_traversal,
replay_stage::DUPLICATE_THRESHOLD,
serve_repair::ShredRepairType,
tree_diff::TreeDiff,
},
solana_ledger::{
ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_meta::SlotMeta,
@ -33,6 +35,14 @@ impl TreeRoot {
pub fn is_pruned(&self) -> bool {
matches!(self, Self::PrunedRoot(_))
}
#[cfg(test)]
pub fn slot(&self) -> Slot {
match self {
Self::Root(slot) => *slot,
Self::PrunedRoot(slot) => *slot,
}
}
}
impl From<TreeRoot> for Slot {
@ -43,10 +53,16 @@ impl From<TreeRoot> for Slot {
}
}
}
#[derive(Clone)]
pub struct RepairWeight {
// Map from root -> a subtree rooted at that `root`
trees: HashMap<Slot, HeaviestSubtreeForkChoice>,
// Map from root -> pruned subtree
// In the case of duplicate blocks linking back to a slot which is pruned, it is important to
// hold onto pruned trees so that we can repair / ancestor hashes repair when necessary. Since
// the parent slot is pruned these blocks will never be replayed / marked dead, so the existing
// dead duplicate confirmed pathway will not catch this special case.
// We manage the size by removing slots < root
pruned_trees: HashMap<Slot, HeaviestSubtreeForkChoice>,
@ -172,7 +188,7 @@ impl RepairWeight {
for (tree_root, updates) in all_subtree_updates {
let tree = self
.get_tree(&tree_root)
.get_tree_mut(tree_root)
.expect("Tree for `tree_root` must exist here");
let updates: Vec<_> = updates.into_iter().collect();
tree.add_votes(
@ -297,17 +313,18 @@ impl RepairWeight {
/// This function removes and destroys the original `ST`.
///
/// Assumes that `slot` is greater than `self.root`.
pub fn split_off(&mut self, slot: Slot) {
/// Returns slots that were orphaned
pub fn split_off(&mut self, slot: Slot) -> HashSet<Slot> {
assert!(slot >= self.root);
if slot == self.root {
error!("Trying to orphan root of repair tree {}", slot);
return;
return HashSet::new();
}
match self.slot_to_tree.get(&slot).copied() {
Some(TreeRoot::Root(subtree_root)) => {
if subtree_root == slot {
info!("{} is already orphan, skipping", slot);
return;
return HashSet::new();
}
let subtree = self
.trees
@ -316,6 +333,7 @@ impl RepairWeight {
let orphaned_tree = subtree.split_off(&(slot, Hash::default()));
self.rename_tree_root(&orphaned_tree, TreeRoot::Root(slot));
self.trees.insert(slot, orphaned_tree);
self.trees.get(&slot).unwrap().slots_iter().collect()
}
Some(TreeRoot::PrunedRoot(subtree_root)) => {
// Even if these orphaned slots were previously pruned, they should be added back to
@ -341,11 +359,17 @@ impl RepairWeight {
// back into the main set of trees, self.trees
self.rename_tree_root(&subtree, TreeRoot::Root(subtree_root));
self.trees.insert(subtree_root, subtree);
self.trees
.get(&subtree_root)
.unwrap()
.slots_iter()
.collect()
} else {
let orphaned_tree = subtree.split_off(&(slot, Hash::default()));
self.pruned_trees.insert(subtree_root, subtree);
self.rename_tree_root(&orphaned_tree, TreeRoot::Root(slot));
self.trees.insert(slot, orphaned_tree);
self.trees.get(&slot).unwrap().slots_iter().collect()
}
}
None => {
@ -353,6 +377,7 @@ impl RepairWeight {
"Trying to split off slot {} which doesn't currently exist in repair",
slot
);
HashSet::new()
}
}
}
@ -710,17 +735,111 @@ impl RepairWeight {
Some(orphan_tree_root)
}
fn get_tree(&mut self, tree_root: &TreeRoot) -> Option<&mut HeaviestSubtreeForkChoice> {
/// If any pruned trees reach the `DUPLICATE_THRESHOLD`, there is a high chance that they are
/// duplicate confirmed (can't say for sure because we don't differentiate by hash in
/// `repair_weight`).
/// For each such pruned tree, find the deepest child which has reached `DUPLICATE_THRESHOLD`
/// for handling in ancestor hashes repair
/// We refer to such trees as "popular" pruned forks, and the deepest child as the "popular" pruned
/// slot of the fork.
///
/// `DUPLICATE_THRESHOLD` is expected to be > 50%.
/// It is expected that no two children of a parent could both reach `DUPLICATE_THRESHOLD`.
pub fn get_popular_pruned_forks(
&self,
epoch_stakes: &HashMap<Epoch, EpochStakes>,
epoch_schedule: &EpochSchedule,
) -> Vec<Slot> {
#[cfg(test)]
static_assertions::const_assert!(DUPLICATE_THRESHOLD > 0.5);
let mut repairs = vec![];
for (pruned_root, pruned_tree) in self.pruned_trees.iter() {
let mut slot_to_start_repair = (*pruned_root, Hash::default());
// This pruned tree *could* span an epoch boundary. To be safe we use the
// minimum DUPLICATE_THRESHOLD across slots in case of stake modification. This
// *could* lead to a false positive.
//
// Additionally, we could have a case where a slot that reached `DUPLICATE_THRESHOLD`
// no longer reaches threshold post epoch boundary due to stake modifications.
//
// Post boundary, we have 2 cases:
// 1) The previously popular slot stays as the majority fork. In this
// case it will eventually reach the new duplicate threshold and
// validators missing the correct version will be able to trigger this pruned
// repair pathway.
// 2) With the stake modifications, this previously popular slot no
// longer holds the majority stake. The remaining stake is now expected to
// reach consensus on a new fork post epoch boundary. Once this consensus is
// reached, validators on the popular pruned fork will be able to switch
// to the new majority fork.
//
// In either case, `HeaviestSubtreeForkChoice` updates the stake only when observing new
// votes leading to a potential mixed bag of stakes being observed. It is safest to use
// the minimum threshold from either side of the boundary.
let min_total_stake = pruned_tree
.slots_iter()
.map(|slot| {
epoch_stakes
.get(&epoch_schedule.get_epoch(slot))
.expect("Pruned tree cannot contain slots more than an epoch behind")
.total_stake()
})
.min()
.expect("Pruned tree cannot be empty");
let duplicate_confirmed_threshold =
((min_total_stake as f64) * DUPLICATE_THRESHOLD) as u64;
// TODO: `HeaviestSubtreeForkChoice` subtracts and migrates stake as validators switch
// forks within the rooted subtree, however `repair_weight` does not migrate stake
// across subtrees. This could lead to an additional false positive if validators
// switch post prune as stake added to a pruned tree it is never removed.
// A further optimization could be to store an additional `latest_votes`
// in `repair_weight` to manage switching across subtrees.
if pruned_tree
.stake_voted_subtree(&slot_to_start_repair)
.expect("Root of tree must exist")
>= duplicate_confirmed_threshold
{
// Search to find the deepest node that still has >= duplicate_confirmed_threshold (could
// just use best slot but this is a slight optimization that will save us some iterations
// in ancestor repair)
while let Some(child) = pruned_tree
.children(&slot_to_start_repair)
.expect("Found earlier, this slot should exist")
.find(|c| {
pruned_tree
.stake_voted_subtree(c)
.expect("Found in children must exist")
>= duplicate_confirmed_threshold
})
{
slot_to_start_repair = *child;
}
repairs.push(slot_to_start_repair.0);
}
}
repairs
}
fn get_tree(&self, tree_root: TreeRoot) -> Option<&HeaviestSubtreeForkChoice> {
match tree_root {
TreeRoot::Root(r) => self.trees.get_mut(r),
TreeRoot::PrunedRoot(r) => self.pruned_trees.get_mut(r),
TreeRoot::Root(r) => self.trees.get(&r),
TreeRoot::PrunedRoot(r) => self.pruned_trees.get(&r),
}
}
fn remove_tree(&mut self, tree_root: &TreeRoot) -> Option<HeaviestSubtreeForkChoice> {
fn get_tree_mut(&mut self, tree_root: TreeRoot) -> Option<&mut HeaviestSubtreeForkChoice> {
match tree_root {
TreeRoot::Root(r) => self.trees.remove(r),
TreeRoot::PrunedRoot(r) => self.pruned_trees.remove(r),
TreeRoot::Root(r) => self.trees.get_mut(&r),
TreeRoot::PrunedRoot(r) => self.pruned_trees.get_mut(&r),
}
}
fn remove_tree(&mut self, tree_root: TreeRoot) -> Option<HeaviestSubtreeForkChoice> {
match tree_root {
TreeRoot::Root(r) => self.trees.remove(&r),
TreeRoot::PrunedRoot(r) => self.pruned_trees.remove(&r),
}
}
@ -728,6 +847,22 @@ impl RepairWeight {
self.slot_to_tree.get(&slot).copied()
}
/// Returns true iff `slot` is currently tracked and in a pruned tree
pub fn is_pruned(&self, slot: Slot) -> bool {
self.get_tree_root(slot)
.as_ref()
.map(TreeRoot::is_pruned)
.unwrap_or(false)
}
/// Returns true iff `slot1` and `slot2` are both tracked and belong to the same tree
pub fn same_tree(&self, slot1: Slot, slot2: Slot) -> bool {
self.get_tree_root(slot1)
.and_then(|tree_root| self.get_tree(tree_root))
.map(|tree| tree.contains_block(&(slot2, Hash::default())))
.unwrap_or(false)
}
/// Assumes that `new_tree_root` does not already exist in `self.trees`
fn insert_new_tree(&mut self, new_tree_root: Slot) {
assert!(!self.trees.contains_key(&new_tree_root));
@ -798,12 +933,12 @@ impl RepairWeight {
epoch_schedule: &EpochSchedule,
) {
// Update self.slot_to_tree to reflect the merge
let tree1 = self.remove_tree(&root1).expect("tree to merge must exist");
let tree1 = self.remove_tree(root1).expect("tree to merge must exist");
self.rename_tree_root(&tree1, root2);
// Merge trees
let tree2 = self
.get_tree(&root2)
.get_tree_mut(root2)
.expect("tree to be merged into must exist");
tree2.merge(
@ -1158,7 +1293,7 @@ mod test {
// Add a vote to a slot chaining to pruned
blockstore.add_tree(tr(6) / tr(20), true, true, 2, Hash::default());
let votes = vec![(23, vote_pubkeys.clone())];
let votes = vec![(23, vote_pubkeys.iter().take(1).copied().collect_vec())];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
@ -1184,6 +1319,55 @@ mod test {
TreeRoot::PrunedRoot(3)
);
// Pruned tree should now have 1 vote
assert_eq!(
repair_weight
.pruned_trees
.get(&3)
.unwrap()
.stake_voted_subtree(&(3, Hash::default()))
.unwrap(),
stake
);
assert_eq!(
repair_weight
.trees
.get(&2)
.unwrap()
.stake_voted_subtree(&(2, Hash::default()))
.unwrap(),
3 * stake
);
// Add the rest of the stake
let votes = vec![(23, vote_pubkeys.iter().skip(1).copied().collect_vec())];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
// Pruned tree should have all the stake as well
assert_eq!(
repair_weight
.pruned_trees
.get(&3)
.unwrap()
.stake_voted_subtree(&(3, Hash::default()))
.unwrap(),
3 * stake
);
assert_eq!(
repair_weight
.trees
.get(&2)
.unwrap()
.stake_voted_subtree(&(2, Hash::default()))
.unwrap(),
3 * stake
);
// Update root and trim pruned tree
repair_weight.set_root(10);
// Add a vote to an orphan, where earliest ancestor is unrooted, should still add as pruned
@ -1610,11 +1794,7 @@ mod test {
#[test]
fn test_set_root_pruned_tree_trim_and_cleanup() {
// Connect orphans to main fork
let blockstore = setup_orphans();
blockstore.add_tree(tr(2) / tr(8), true, true, 2, Hash::default());
blockstore.add_tree(tr(3) / (tr(9) / tr(20)), true, true, 2, Hash::default());
let blockstore = setup_big_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(3, stake);
let votes = vec![
@ -1717,11 +1897,7 @@ mod test {
#[test]
fn test_set_root_pruned_tree_split() {
// Connect orphans to main fork
let blockstore = setup_orphans();
blockstore.add_tree(tr(2) / tr(8), true, true, 2, Hash::default());
blockstore.add_tree(tr(3) / (tr(9) / tr(20)), true, true, 2, Hash::default());
let blockstore = setup_big_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(3, stake);
let votes = vec![
@ -2045,7 +2221,7 @@ mod test {
// Add 22 to `pruned_trees`, ancestor search should now
// chain it back to 20
repair_weight.remove_tree(&TreeRoot::Root(20)).unwrap();
repair_weight.remove_tree(TreeRoot::Root(20)).unwrap();
repair_weight.insert_new_pruned_tree(20);
assert_eq!(
repair_weight.find_ancestor_subtree_of_slot(&blockstore, 23),
@ -2222,6 +2398,245 @@ mod test {
assert_eq!(orphans, vec![0, 3]);
}
#[test]
fn test_get_popular_pruned_forks() {
let blockstore = setup_big_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(10, stake);
let epoch_stakes = bank.epoch_stakes_map();
let epoch_schedule = bank.epoch_schedule();
// Add a little stake for each fork
let votes = vec![
(4, vec![vote_pubkeys[0]]),
(11, vec![vote_pubkeys[1]]),
(6, vec![vote_pubkeys[2]]),
(23, vec![vote_pubkeys[3]]),
];
let mut repair_weight = RepairWeight::new(0);
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
// Set root to 4, there should now be 3 pruned trees with `stake`
repair_weight.set_root(4);
assert_eq!(repair_weight.trees.len(), 1);
assert_eq!(repair_weight.pruned_trees.len(), 3);
assert!(repair_weight
.pruned_trees
.iter()
.all(
|(root, pruned_tree)| pruned_tree.stake_voted_subtree(&(*root, Hash::default()))
== Some(stake)
));
// No fork has DUPLICATE_THRESHOLD, should not be any popular forks
assert!(repair_weight
.get_popular_pruned_forks(epoch_stakes, epoch_schedule)
.is_empty());
// 500 stake, still less than DUPLICATE_THRESHOLD, should not be any popular forks
let five_votes = vote_pubkeys.iter().copied().take(5).collect_vec();
let votes = vec![(11, five_votes.clone()), (6, five_votes)];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
assert!(repair_weight
.get_popular_pruned_forks(epoch_stakes, epoch_schedule)
.is_empty());
// 600 stake, since we voted for leaf, leaf should be returned
let votes = vec![(11, vec![vote_pubkeys[5]]), (6, vec![vote_pubkeys[6]])];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
assert_eq!(
vec![6, 11],
repair_weight
.get_popular_pruned_forks(epoch_stakes, epoch_schedule)
.into_iter()
.sorted()
.collect_vec()
);
// For the last pruned tree we leave 100 stake on 23 and 22 and put 600 stake on 20. We
// should return 20 and not traverse the tree deeper
let six_votes = vote_pubkeys.iter().copied().take(6).collect_vec();
let votes = vec![(20, six_votes)];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
assert_eq!(
vec![6, 11, 20],
repair_weight
.get_popular_pruned_forks(epoch_stakes, epoch_schedule)
.into_iter()
.sorted()
.collect_vec()
);
}
#[test]
fn test_get_popular_pruned_forks_forks() {
let blockstore = setup_big_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(10, stake);
let epoch_stakes = bank.epoch_stakes_map();
let epoch_schedule = bank.epoch_schedule();
// Add a little stake for each fork
let votes = vec![
(4, vec![vote_pubkeys[0]]),
(11, vec![vote_pubkeys[1]]),
(6, vec![vote_pubkeys[2]]),
(23, vec![vote_pubkeys[3]]),
];
let mut repair_weight = RepairWeight::new(0);
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
// Prune the entire tree
std::mem::swap(&mut repair_weight.trees, &mut repair_weight.pruned_trees);
repair_weight
.slot_to_tree
.iter_mut()
.for_each(|(_, s)| *s = TreeRoot::PrunedRoot(s.slot()));
// Traverse to 20
let mut repair_weight_20 = repair_weight.clone();
repair_weight_20.add_votes(
&blockstore,
vec![(20, vote_pubkeys.clone())].into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
assert_eq!(
vec![20],
repair_weight_20.get_popular_pruned_forks(epoch_stakes, epoch_schedule)
);
// 4 and 8 individually do not have enough stake, but 2 is popular
let votes = vec![(10, vote_pubkeys.iter().copied().skip(6).collect_vec())];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
assert_eq!(
vec![2],
repair_weight.get_popular_pruned_forks(epoch_stakes, epoch_schedule)
);
}
#[test]
fn test_get_popular_pruned_forks_stake_change_across_epoch_boundary() {
let blockstore = setup_big_forks();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(10, stake);
let mut epoch_stakes = bank.epoch_stakes_map().clone();
let mut epoch_schedule = *bank.epoch_schedule();
// Simulate epoch boundary at slot 10, where half of the stake deactivates
// Additional epoch boundary at slot 20, where 30% of the stake reactivates
let initial_stakes = epoch_stakes
.get(&epoch_schedule.get_epoch(0))
.unwrap()
.clone();
let mut dec_stakes = epoch_stakes
.get(&epoch_schedule.get_epoch(0))
.unwrap()
.clone();
let mut inc_stakes = epoch_stakes
.get(&epoch_schedule.get_epoch(0))
.unwrap()
.clone();
epoch_schedule.first_normal_slot = 0;
epoch_schedule.slots_per_epoch = 10;
assert_eq!(
epoch_schedule.get_epoch(10),
epoch_schedule.get_epoch(9) + 1
);
assert_eq!(
epoch_schedule.get_epoch(20),
epoch_schedule.get_epoch(19) + 1
);
dec_stakes.set_total_stake(dec_stakes.total_stake() - 5 * stake);
inc_stakes.set_total_stake(dec_stakes.total_stake() + 3 * stake);
epoch_stakes.insert(epoch_schedule.get_epoch(0), initial_stakes);
epoch_stakes.insert(epoch_schedule.get_epoch(10), dec_stakes);
epoch_stakes.insert(epoch_schedule.get_epoch(20), inc_stakes);
// Add a little stake for each fork
let votes = vec![
(4, vec![vote_pubkeys[0]]),
(11, vec![vote_pubkeys[1]]),
(6, vec![vote_pubkeys[2]]),
(23, vec![vote_pubkeys[3]]),
];
let mut repair_weight = RepairWeight::new(0);
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
// Set root to 4, there should now be 3 pruned trees with `stake`
repair_weight.set_root(4);
assert_eq!(repair_weight.trees.len(), 1);
assert_eq!(repair_weight.pruned_trees.len(), 3);
assert!(repair_weight
.pruned_trees
.iter()
.all(
|(root, pruned_tree)| pruned_tree.stake_voted_subtree(&(*root, Hash::default()))
== Some(stake)
));
// No fork hash `DUPLICATE_THRESHOLD`, should not be any popular forks
assert!(repair_weight
.get_popular_pruned_forks(&epoch_stakes, &epoch_schedule)
.is_empty());
// 400 stake, For the 6 tree it will be less than `DUPLICATE_THRESHOLD`, however 11
// has epoch modifications where at some point 400 stake is enough. For 22, although it
// does cross the second epoch where the stake requirement was less, because it doesn't
// have any blocks in that epoch the minimum total stake is still 800 which fails.
let four_votes = vote_pubkeys.iter().copied().take(4).collect_vec();
let votes = vec![
(11, four_votes.clone()),
(6, four_votes.clone()),
(22, four_votes),
];
repair_weight.add_votes(
&blockstore,
votes.into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
assert_eq!(
vec![11],
repair_weight.get_popular_pruned_forks(&epoch_stakes, &epoch_schedule)
);
}
fn setup_orphan_repair_weight() -> (Blockstore, Bank, RepairWeight) {
let blockstore = setup_orphans();
let stake = 100;
@ -2316,6 +2731,32 @@ mod test {
blockstore
}
fn setup_big_forks() -> Blockstore {
/*
Build fork structure:
slot 0
|
slot 1
/ \
/----| |----|
slot 2 |
/ \ slot 3
slot 4 slot 8 / \
| slot 5 slot 9
slot 10 | |
| slot 6 slot 20
slot 11 |
slot 22
|
slot 23
*/
let blockstore = setup_orphans();
// Connect orphans to main fork
blockstore.add_tree(tr(2) / tr(8), true, true, 2, Hash::default());
blockstore.add_tree(tr(3) / (tr(9) / tr(20)), true, true, 2, Hash::default());
blockstore
}
fn setup_forks() -> Blockstore {
/*
Build fork structure:

View File

@ -18,11 +18,14 @@ use {
SWITCH_FORK_THRESHOLD,
},
cost_update_service::CostUpdate,
duplicate_repair_status::AncestorDuplicateSlotsToRepair,
fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats},
repair_service::{DumpedSlotsSender, DuplicateSlotsResetReceiver},
repair_service::{
AncestorDuplicateSlotsReceiver, DumpedSlotsSender, PopularPrunedForksReceiver,
},
rewards_recorder_service::{RewardsMessage, RewardsRecorderSender},
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
@ -268,9 +271,11 @@ pub struct ReplayTiming {
wait_receive_elapsed: u64,
heaviest_fork_failures_elapsed: u64,
bank_count: u64,
process_ancestor_hashes_duplicate_slots_elapsed: u64,
process_gossip_duplicate_confirmed_slots_elapsed: u64,
process_duplicate_slots_elapsed: u64,
process_unfrozen_gossip_verified_vote_hashes_elapsed: u64,
process_popular_pruned_forks_elapsed: u64,
repair_correct_slots_elapsed: u64,
retransmit_not_propagated_elapsed: u64,
generate_new_bank_forks_read_lock_us: u64,
@ -296,8 +301,10 @@ impl ReplayTiming {
wait_receive_elapsed: u64,
heaviest_fork_failures_elapsed: u64,
bank_count: u64,
process_ancestor_hashes_duplicate_slots_elapsed: u64,
process_gossip_duplicate_confirmed_slots_elapsed: u64,
process_unfrozen_gossip_verified_vote_hashes_elapsed: u64,
process_popular_pruned_forks_elapsed: u64,
process_duplicate_slots_elapsed: u64,
repair_correct_slots_elapsed: u64,
retransmit_not_propagated_elapsed: u64,
@ -315,10 +322,13 @@ impl ReplayTiming {
self.wait_receive_elapsed += wait_receive_elapsed;
self.heaviest_fork_failures_elapsed += heaviest_fork_failures_elapsed;
self.bank_count += bank_count;
self.process_ancestor_hashes_duplicate_slots_elapsed +=
process_ancestor_hashes_duplicate_slots_elapsed;
self.process_gossip_duplicate_confirmed_slots_elapsed +=
process_gossip_duplicate_confirmed_slots_elapsed;
self.process_unfrozen_gossip_verified_vote_hashes_elapsed +=
process_unfrozen_gossip_verified_vote_hashes_elapsed;
self.process_popular_pruned_forks_elapsed += process_popular_pruned_forks_elapsed;
self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed;
self.repair_correct_slots_elapsed += repair_correct_slots_elapsed;
self.retransmit_not_propagated_elapsed += retransmit_not_propagated_elapsed;
@ -381,6 +391,11 @@ impl ReplayTiming {
self.replay_active_banks_elapsed as i64,
i64
),
(
"process_ancestor_hashes_duplicate_slots_elapsed",
self.process_ancestor_hashes_duplicate_slots_elapsed as i64,
i64
),
(
"process_gossip_duplicate_confirmed_slots_elapsed",
self.process_gossip_duplicate_confirmed_slots_elapsed as i64,
@ -391,6 +406,11 @@ impl ReplayTiming {
self.process_unfrozen_gossip_verified_vote_hashes_elapsed as i64,
i64
),
(
"process_popular_pruned_forks_elapsed",
self.process_popular_pruned_forks_elapsed as i64,
i64
),
(
"wait_receive_elapsed",
self.wait_receive_elapsed as i64,
@ -468,7 +488,7 @@ impl ReplayStage {
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
epoch_slots_frozen_receiver: DuplicateSlotsResetReceiver,
ancestor_duplicate_slots_receiver: AncestorDuplicateSlotsReceiver,
replay_vote_sender: ReplayVoteSender,
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
@ -481,6 +501,7 @@ impl ReplayStage {
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
dumped_slots_sender: DumpedSlotsSender,
banking_tracer: Arc<BankingTracer>,
popular_pruned_forks_receiver: PopularPrunedForksReceiver,
) -> Result<Self, String> {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower()?;
@ -626,13 +647,15 @@ impl ReplayStage {
let forks_root = bank_forks.read().unwrap().root();
// Reset any dead slots that have been frozen by a sufficient portion of
// the network. Signalled by repair_service.
let mut purge_dead_slots_time = Measure::start("purge_dead_slots");
Self::process_epoch_slots_frozen_dead_slots(
// Process cluster-agreed versions of duplicate slots for which we potentially
// have the wrong version. Our version was dead or pruned.
// Signalled by ancestor_hashes_service.
let mut process_ancestor_hashes_duplicate_slots_time =
Measure::start("process_ancestor_hashes_duplicate_slots");
Self::process_ancestor_hashes_duplicate_slots(
&my_pubkey,
&blockstore,
&epoch_slots_frozen_receiver,
&ancestor_duplicate_slots_receiver,
&mut duplicate_slots_tracker,
&gossip_duplicate_confirmed_slots,
&mut epoch_slots_frozen_slots,
@ -643,7 +666,7 @@ impl ReplayStage {
&ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
);
purge_dead_slots_time.stop();
process_ancestor_hashes_duplicate_slots_time.stop();
// Check for any newly confirmed slots detected from gossip.
let mut process_gossip_duplicate_confirmed_slots_time =
@ -678,6 +701,24 @@ impl ReplayStage {
for _ in gossip_verified_vote_hash_receiver.try_iter() {}
process_unfrozen_gossip_verified_vote_hashes_time.stop();
let mut process_popular_pruned_forks_time =
Measure::start("process_popular_pruned_forks_time");
// Check for "popular" (52+% stake aggregated across versions/descendants) forks
// that are pruned, which would not be detected by normal means.
// Signalled by `repair_service`.
Self::process_popular_pruned_forks(
&popular_pruned_forks_receiver,
&blockstore,
&mut duplicate_slots_tracker,
&mut epoch_slots_frozen_slots,
&bank_forks,
&mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
&mut purge_repair_slot_counter,
);
process_popular_pruned_forks_time.stop();
// Check to remove any duplicated slots from fork choice
let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots");
if !tpu_has_bank {
@ -1053,8 +1094,10 @@ impl ReplayStage {
wait_receive_time.as_us(),
heaviest_fork_failures_time.as_us(),
u64::from(did_complete_bank),
process_ancestor_hashes_duplicate_slots_time.as_us(),
process_gossip_duplicate_confirmed_slots_time.as_us(),
process_unfrozen_gossip_verified_vote_hashes_time.as_us(),
process_popular_pruned_forks_time.as_us(),
process_duplicate_slots_time.as_us(),
dump_then_repair_correct_slots_time.as_us(),
retransmit_not_propagated_time.as_us(),
@ -1292,10 +1335,9 @@ impl ReplayStage {
}
} else {
warn!(
"Trying to dump slot {} which does not exist in bank forks",
"Dumping slot {} which does not exist in bank forks (possibly pruned)",
*duplicate_slot
);
return false;
}
@ -1357,10 +1399,10 @@ impl ReplayStage {
}
#[allow(clippy::too_many_arguments)]
fn process_epoch_slots_frozen_dead_slots(
fn process_ancestor_hashes_duplicate_slots(
pubkey: &Pubkey,
blockstore: &Blockstore,
epoch_slots_frozen_receiver: &DuplicateSlotsResetReceiver,
ancestor_duplicate_slots_receiver: &AncestorDuplicateSlotsReceiver,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
@ -1372,13 +1414,17 @@ impl ReplayStage {
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) {
let root = bank_forks.read().unwrap().root();
for maybe_purgeable_duplicate_slots in epoch_slots_frozen_receiver.try_iter() {
for AncestorDuplicateSlotsToRepair {
slots_to_repair: maybe_repairable_duplicate_slots,
request_type,
} in ancestor_duplicate_slots_receiver.try_iter()
{
warn!(
"{} ReplayStage notified of epoch slots duplicate frozen dead slots: {:?}",
pubkey, maybe_purgeable_duplicate_slots
"{} ReplayStage notified of duplicate slots from ancestor hashes service but we observed as {}: {:?}",
pubkey, if request_type.is_pruned() {"pruned"} else {"dead"}, maybe_repairable_duplicate_slots,
);
for (epoch_slots_frozen_slot, epoch_slots_frozen_hash) in
maybe_purgeable_duplicate_slots.into_iter()
maybe_repairable_duplicate_slots.into_iter()
{
let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state(
epoch_slots_frozen_slot,
@ -1393,6 +1439,7 @@ impl ReplayStage {
.get(epoch_slots_frozen_slot)
.map(|b| b.hash())
},
request_type.is_pruned(),
);
check_slot_agrees_with_cluster(
epoch_slots_frozen_slot,
@ -1531,6 +1578,40 @@ impl ReplayStage {
.expect("must exist based on earlier check");
}
#[allow(clippy::too_many_arguments)]
fn process_popular_pruned_forks(
popular_pruned_forks_receiver: &PopularPrunedForksReceiver,
blockstore: &Blockstore,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
bank_forks: &RwLock<BankForks>,
fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) {
let root = bank_forks.read().unwrap().root();
for new_popular_pruned_slots in popular_pruned_forks_receiver.try_iter() {
for new_popular_pruned_slot in new_popular_pruned_slots {
if new_popular_pruned_slot <= root {
continue;
}
check_slot_agrees_with_cluster(
new_popular_pruned_slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::PopularPrunedFork,
);
}
}
}
// Check for any newly confirmed slots by the cluster. This is only detects
// optimistic and in the future, duplicate slot confirmations on the exact
// single slots and does not account for votes on their descendants. Used solely

View File

@ -1,7 +1,7 @@
use {
crate::{
cluster_slots::ClusterSlots,
duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE,
duplicate_repair_status::get_ancestor_hash_repair_sample_size,
repair_response,
repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS},
request_response::RequestResponse,
@ -65,11 +65,15 @@ pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 11;
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
// Limit cache entries ttl in order to avoid re-using outdated data.
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);
#[cfg(test)]
static_assertions::const_assert_eq!(MAX_ANCESTOR_BYTES_IN_PACKET, 1220);
pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize =
PACKET_DATA_SIZE -
SIZE_OF_NONCE -
4 /*(response version enum discriminator)*/ -
4 /*slot_hash length*/;
pub const MAX_ANCESTOR_RESPONSES: usize =
MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<(Slot, Hash)>();
/// Number of bytes in the randomly generated token sent with ping messages.
@ -1109,7 +1113,7 @@ impl ServeRepair {
let addr = repair_peers[i].serve_repair().ok()?;
Some((*repair_peers[i].pubkey(), addr))
})
.take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE)
.take(get_ancestor_hash_repair_sample_size())
.collect();
Ok(peers)
}

View File

@ -188,17 +188,18 @@ impl Tvu {
);
let cluster_slots = Arc::new(ClusterSlots::default());
let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded();
let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded();
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
unbounded();
let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded();
let window_service = {
let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule();
let repair_info = RepairInfo {
bank_forks: bank_forks.clone(),
epoch_schedule,
duplicate_slots_reset_sender,
ancestor_duplicate_slots_sender,
repair_validators: tvu_config.repair_validators,
repair_whitelist: tvu_config.repair_whitelist,
cluster_info: cluster_info.clone(),
@ -218,6 +219,7 @@ impl Tvu {
duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
)
};
@ -290,7 +292,7 @@ impl Tvu {
vote_tracker,
cluster_slots,
retransmit_slots_sender,
duplicate_slots_reset_receiver,
ancestor_duplicate_slots_receiver,
replay_vote_sender,
gossip_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
@ -303,6 +305,7 @@ impl Tvu {
prioritization_fee_cache.clone(),
dumped_slots_sender,
banking_tracer,
popular_pruned_forks_receiver,
)?;
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {

View File

@ -7,7 +7,10 @@ use {
cluster_info_vote_listener::VerifiedVoteReceiver,
completed_data_sets_service::CompletedDataSetsSender,
repair_response,
repair_service::{DumpedSlotsReceiver, OutstandingShredRepairs, RepairInfo, RepairService},
repair_service::{
DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo,
RepairService,
},
result::{Error, Result},
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
@ -316,6 +319,7 @@ impl WindowService {
duplicate_slots_sender: DuplicateSlotSender,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
popular_pruned_forks_sender: PopularPrunedForksSender,
) -> WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
@ -331,6 +335,7 @@ impl WindowService {
outstanding_requests.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
);
let (duplicate_sender, duplicate_receiver) = unbounded();

View File

@ -3153,6 +3153,11 @@ impl Blockstore {
Ok(())
}
/// For tests
pub fn set_last_root(&mut self, root: Slot) {
*self.last_root.write().unwrap() = root;
}
pub fn mark_slots_as_if_rooted_normally_at_startup(
&self,
slots: Vec<(Slot, Option<Hash>)>,

View File

@ -32,6 +32,7 @@ solana-thin-client = { workspace = true }
solana-tpu-client = { workspace = true }
solana-vote-program = { workspace = true }
tempfile = { workspace = true }
trees = { workspace = true }
[dev-dependencies]
assert_matches = { workspace = true }

View File

@ -58,6 +58,10 @@ pub fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Sl
restore_tower(tower_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap())
}
pub fn last_root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Slot> {
restore_tower(tower_path, node_pubkey).map(|tower| tower.root())
}
pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<Tower> {
let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf());

View File

@ -8,7 +8,9 @@ use {
serial_test::serial,
solana_core::validator::ValidatorConfig,
solana_gossip::gossip_service::discover_cluster,
solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore},
solana_ledger::{
ancestor_iterator::AncestorIterator, blockstore::Blockstore, leader_schedule::FixedSchedule,
},
solana_local_cluster::{
cluster::Cluster,
cluster_tests,
@ -23,7 +25,13 @@ use {
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::{collections::HashSet, sync::Arc, thread::sleep, time::Duration},
std::{
collections::HashSet,
sync::Arc,
thread::sleep,
time::{Duration, Instant},
},
trees::tr,
};
mod common;
@ -453,3 +461,281 @@ fn test_slot_hash_expiry() {
"test_slot_hashes_expiry",
);
}
// This test simulates a case where a leader sends a duplicate block with different ancestory. One
// version builds off of the rooted path, however the other version builds off a pruned branch. The
// validators that receive the pruned version will need to repair in order to continue, which
// requires an ancestor hashes repair.
//
// We setup 3 validators:
// - majority, will produce the rooted path
// - minority, will produce the pruned path
// - our_node, will be fed the pruned version of the duplicate block and need to repair
//
// Additionally we setup 3 observer nodes to propagate votes and participate in the ancestor hashes
// sample.
//
// Fork structure:
//
// 0 - 1 - ... - 10 (fork slot) - 30 - ... - 61 (rooted path) - ...
// |
// |- 11 - ... - 29 (pruned path) - 81'
//
//
// Steps:
// 1) Different leader schedule, minority thinks it produces 0-29 and majority rest, majority
// thinks minority produces all blocks. This is to avoid majority accidentally producing blocks
// before it shuts down.
// 2) Start cluster, kill our_node.
// 3) Kill majority cluster after it votes for any slot > fork slot (guarantees that the fork slot is
// reached as minority cannot pass threshold otherwise).
// 4) Let minority produce forks on pruned forks until out of leader slots then kill.
// 5) Truncate majority ledger past fork slot so it starts building off of fork slot.
// 6) Restart majority and wait untill it starts producing blocks on main fork and roots something
// past the fork slot.
// 7) Construct our ledger by copying majority ledger and copying blocks from minority for the pruned path.
// 8) In our node's ledger, change the parent of the latest slot in majority fork to be the latest
// slot in the minority fork (simulates duplicate built off of pruned block)
// 9) Start our node which will pruned the minority fork on ledger replay and verify that we can make roots.
//
#[test]
#[serial]
fn test_duplicate_with_pruned_ancestor() {
solana_logger::setup_with("info,solana_metrics=off");
solana_core::duplicate_repair_status::set_ancestor_hash_repair_sample_size_for_tests_only(3);
let majority_leader_stake = 10_000_000 * DEFAULT_NODE_STAKE;
let minority_leader_stake = 2_000_000 * DEFAULT_NODE_STAKE;
let our_node = DEFAULT_NODE_STAKE;
let observer_stake = DEFAULT_NODE_STAKE;
let slots_per_epoch = 2048;
let fork_slot: u64 = 10;
let fork_length: u64 = 20;
let majority_fork_buffer = 5;
let mut node_stakes = vec![majority_leader_stake, minority_leader_stake, our_node];
// We need enough observers to reach `ANCESTOR_HASH_REPAIR_SAMPLE_SIZE`
node_stakes.append(&mut vec![observer_stake; 3]);
let num_nodes = node_stakes.len();
let validator_keys = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
"4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye",
"3zsEPEDsjfEay7te9XqNjRTCE7vwuT6u4DHzBJC19yp7GS8BuNRMRjnpVrKCBzb3d44kxc4KPGSHkCmk6tEfswCg",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.chain(std::iter::repeat_with(|| (Arc::new(Keypair::new()), true)))
.take(node_stakes.len())
.collect::<Vec<_>>();
let validators = validator_keys
.iter()
.map(|(kp, _)| kp.pubkey())
.collect::<Vec<_>>();
let (majority_pubkey, minority_pubkey, our_node_pubkey) =
(validators[0], validators[1], validators[2]);
let mut default_config = ValidatorConfig::default_for_test();
// Minority fork is leader long enough to create pruned fork
let validator_to_slots = vec![
(minority_pubkey, (fork_slot + fork_length) as usize),
(majority_pubkey, slots_per_epoch as usize),
];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
default_config.fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
});
let mut validator_configs = make_identical_validator_configs(&default_config, num_nodes);
validator_configs[3].voting_disabled = true;
// Don't let majority produce anything past the fork by tricking its leader schedule
validator_configs[0].fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(create_custom_leader_schedule(
[(minority_pubkey, slots_per_epoch as usize)].into_iter(),
)),
});
let mut config = ClusterConfig {
cluster_lamports: DEFAULT_CLUSTER_LAMPORTS + node_stakes.iter().sum::<u64>(),
node_stakes,
validator_configs,
validator_keys: Some(validator_keys),
slots_per_epoch,
stakers_slot_offset: slots_per_epoch,
skip_warmup_slots: true,
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let majority_ledger_path = cluster.ledger_path(&majority_pubkey);
let minority_ledger_path = cluster.ledger_path(&minority_pubkey);
let our_node_ledger_path = cluster.ledger_path(&our_node_pubkey);
info!(
"majority {} ledger path {:?}",
majority_pubkey, majority_ledger_path
);
info!(
"minority {} ledger path {:?}",
minority_pubkey, minority_ledger_path
);
info!(
"our_node {} ledger path {:?}",
our_node_pubkey, our_node_ledger_path
);
info!("Killing our node");
let our_node_info = cluster.exit_node(&our_node_pubkey);
info!("Waiting on majority validator to vote on at least {fork_slot}");
let now = Instant::now();
let mut last_majority_vote = 0;
loop {
let elapsed = now.elapsed();
assert!(
elapsed <= Duration::from_secs(30),
"Majority validator failed to vote on a slot >= {} in {} secs,
majority validator last vote: {}",
fork_slot,
elapsed.as_secs(),
last_majority_vote,
);
sleep(Duration::from_millis(100));
if let Some((last_vote, _)) = last_vote_in_tower(&majority_ledger_path, &majority_pubkey) {
last_majority_vote = last_vote;
if last_vote >= fork_slot {
break;
}
}
}
info!("Killing majority validator, waiting for minority fork to reach a depth of at least 15",);
let mut majority_validator_info = cluster.exit_node(&majority_pubkey);
let now = Instant::now();
let mut last_minority_vote = 0;
while last_minority_vote < fork_slot + 15 {
let elapsed = now.elapsed();
assert!(
elapsed <= Duration::from_secs(30),
"Minority validator failed to create a fork of depth >= {} in {} secs,
last_minority_vote: {}",
15,
elapsed.as_secs(),
last_minority_vote,
);
if let Some((last_vote, _)) = last_vote_in_tower(&minority_ledger_path, &minority_pubkey) {
last_minority_vote = last_vote;
}
}
info!(
"Killing minority validator, fork created successfully: {:?}",
last_minority_vote
);
let last_minority_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&minority_ledger_path, &minority_pubkey);
let minority_validator_info = cluster.exit_node(&minority_pubkey);
info!("Truncating majority validator ledger to {fork_slot}");
{
remove_tower(&majority_ledger_path, &majority_pubkey);
let blockstore = open_blockstore(&majority_ledger_path);
purge_slots_with_count(&blockstore, fork_slot + 1, 100);
}
info!("Restarting majority validator");
// Make sure we don't send duplicate votes
majority_validator_info.config.wait_to_vote_slot = Some(fork_slot + fork_length);
// Fix the leader schedule so we can produce blocks
majority_validator_info.config.fixed_leader_schedule =
minority_validator_info.config.fixed_leader_schedule.clone();
cluster.restart_node(
&majority_pubkey,
majority_validator_info,
SocketAddrSpace::Unspecified,
);
let mut last_majority_root = 0;
let now = Instant::now();
info!(
"Waiting for majority validator to root something past {}",
fork_slot + fork_length + majority_fork_buffer
);
while last_majority_root <= fork_slot + fork_length + majority_fork_buffer {
let elapsed = now.elapsed();
assert!(
elapsed <= Duration::from_secs(60),
"Majority validator failed to root something > {} in {} secs,
last majority validator vote: {},",
fork_slot + fork_length + majority_fork_buffer,
elapsed.as_secs(),
last_majority_vote,
);
sleep(Duration::from_millis(100));
if let Some(last_root) = last_root_in_tower(&majority_ledger_path, &majority_pubkey) {
last_majority_root = last_root;
}
}
let last_majority_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey);
info!("Creating duplicate block built off of pruned branch for our node. Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}");
{
{
// Copy majority fork
std::fs::remove_dir_all(&our_node_info.info.ledger_path).unwrap();
let mut opt = fs_extra::dir::CopyOptions::new();
opt.copy_inside = true;
fs_extra::dir::copy(&majority_ledger_path, &our_node_ledger_path, &opt).unwrap();
remove_tower(&our_node_ledger_path, &majority_pubkey);
}
// Copy minority fork. Rewind our root so that we can copy over the purged bank
let minority_blockstore = open_blockstore(&minority_validator_info.info.ledger_path);
let mut our_blockstore = open_blockstore(&our_node_info.info.ledger_path);
our_blockstore.set_last_root(fork_slot - 1);
copy_blocks(last_minority_vote, &minority_blockstore, &our_blockstore);
// Change last block parent to chain off of (purged) minority fork
info!("For our node, changing parent of {last_majority_vote} to {last_minority_vote}");
purge_slots_with_count(&our_blockstore, last_majority_vote, 1);
our_blockstore.add_tree(
tr(last_minority_vote) / tr(last_majority_vote),
false,
true,
64,
Hash::default(),
);
// Update the root to set minority fork back as pruned
our_blockstore.set_last_root(fork_slot + fork_length);
}
// Actual test, `our_node` will replay the minority fork, then the majority fork which will
// prune the minority fork. Then finally the problematic block will be skipped (not replayed)
// because its parent has been pruned from bank forks. Meanwhile the majority validator has
// continued making blocks and voting, duplicate confirming everything. This will cause the
// pruned fork to become popular triggering an ancestor hashes repair, eventually allowing our
// node to dump & repair & continue making roots.
info!("Restarting our node, verifying that our node is making roots past the duplicate block");
cluster.restart_node(
&our_node_pubkey,
our_node_info,
SocketAddrSpace::Unspecified,
);
cluster_tests::check_for_new_roots(
16,
&[cluster.get_contact_info(&our_node_pubkey).unwrap().clone()],
&cluster.connection_cache,
"test_duplicate_with_pruned_ancestor",
);
}

View File

@ -44,6 +44,11 @@ impl EpochStakes {
self.total_stake
}
/// For tests
pub fn set_total_stake(&mut self, total_stake: u64) {
self.total_stake = total_stake;
}
pub fn node_id_to_vote_accounts(&self) -> &Arc<NodeIdToVoteAccounts> {
&self.node_id_to_vote_accounts
}