Plumb signal from replay to ancestor hashes service (#18880)

This commit is contained in:
carllin 2021-07-26 20:59:00 -07:00 committed by GitHub
parent eaeeffa5a3
commit c0704d4ec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 130 additions and 11 deletions

View File

@ -6,6 +6,7 @@ use crate::{
result::{Error, Result}, result::{Error, Result},
serve_repair::AncestorHashesRepairType, serve_repair::AncestorHashesRepairType,
}; };
use crossbeam_channel::{Receiver, Sender};
use dashmap::{mapref::entry::Entry::Occupied, DashMap}; use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}; use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE};
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
@ -29,7 +30,16 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
#[derive(Debug, PartialEq)]
pub enum AncestorHashesReplayUpdate {
Dead(Slot),
DeadDuplicateConfirmed(Slot),
}
pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2; pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2;
pub type AncestorHashesReplayUpdateSender = Sender<AncestorHashesReplayUpdate>;
pub type AncestorHashesReplayUpdateReceiver = Receiver<AncestorHashesReplayUpdate>;
type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>; type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>;
#[derive(Default)] #[derive(Default)]
@ -114,6 +124,7 @@ impl AncestorHashesService {
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
ancestor_hashes_request_socket: Arc<UdpSocket>, ancestor_hashes_request_socket: Arc<UdpSocket>,
repair_info: RepairInfo, repair_info: RepairInfo,
_ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self { ) -> Self {
let outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>> = let outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>> =
Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default())); Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default()));

View File

@ -1,4 +1,8 @@
use crate::{fork_choice::ForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice}; use crate::{
ancestor_hashes_service::{AncestorHashesReplayUpdate, AncestorHashesReplayUpdateSender},
fork_choice::ForkChoice,
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
};
use solana_ledger::blockstore::Blockstore; use solana_ledger::blockstore::Blockstore;
use solana_sdk::{clock::Slot, hash::Hash}; use solana_sdk::{clock::Slot, hash::Hash};
use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::collections::{BTreeMap, BTreeSet, HashSet};
@ -220,6 +224,7 @@ pub enum ResultingStateChange {
RepairDuplicateConfirmedVersion(Hash), RepairDuplicateConfirmedVersion(Hash),
// Hash of our current frozen version of the slot // Hash of our current frozen version of the slot
DuplicateConfirmedSlotMatchesCluster(Hash), DuplicateConfirmedSlotMatchesCluster(Hash),
SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate),
} }
impl SlotStateUpdate { impl SlotStateUpdate {
@ -294,6 +299,9 @@ fn on_dead_slot(slot: Slot, dead_state: DeadState) -> Vec<ResultingStateChange>
// check if our version agrees with the cluster, // check if our version agrees with the cluster,
let bank_hash = Hash::default(); let bank_hash = Hash::default();
let is_dead = true; let is_dead = true;
state_changes.push(ResultingStateChange::SendAncestorHashesReplayUpdate(
AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot),
));
check_duplicate_confirmed_hash_against_frozen_hash( check_duplicate_confirmed_hash_against_frozen_hash(
&mut state_changes, &mut state_changes,
slot, slot,
@ -301,9 +309,14 @@ fn on_dead_slot(slot: Slot, dead_state: DeadState) -> Vec<ResultingStateChange>
bank_hash, bank_hash,
is_dead, is_dead,
); );
} else if is_slot_duplicate { } else {
state_changes.push(ResultingStateChange::SendAncestorHashesReplayUpdate(
AncestorHashesReplayUpdate::Dead(slot),
));
if is_slot_duplicate {
state_changes.push(ResultingStateChange::MarkSlotDuplicate(Hash::default())); state_changes.push(ResultingStateChange::MarkSlotDuplicate(Hash::default()));
} }
}
state_changes state_changes
} }
@ -351,9 +364,14 @@ fn on_duplicate_confirmed(
} }
let bank_hash = bank_status.bank_hash().expect("bank hash must exist"); let bank_hash = bank_status.bank_hash().expect("bank hash must exist");
let is_dead = bank_status.is_dead();
let mut state_changes = vec![]; let mut state_changes = vec![];
let is_dead = bank_status.is_dead();
if is_dead {
state_changes.push(ResultingStateChange::SendAncestorHashesReplayUpdate(
AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot),
));
}
check_duplicate_confirmed_hash_against_frozen_hash( check_duplicate_confirmed_hash_against_frozen_hash(
&mut state_changes, &mut state_changes,
slot, slot,
@ -460,6 +478,7 @@ fn apply_state_changes(
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
blockstore: &Blockstore, blockstore: &Blockstore,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
state_changes: Vec<ResultingStateChange>, state_changes: Vec<ResultingStateChange>,
) { ) {
// Handle cases where the bank is frozen, but not duplicate confirmed // Handle cases where the bank is frozen, but not duplicate confirmed
@ -494,6 +513,9 @@ fn apply_state_changes(
) )
.unwrap(); .unwrap();
} }
ResultingStateChange::SendAncestorHashesReplayUpdate(ancestor_hashes_replay_update) => {
let _ = ancestor_hashes_replay_update_sender.send(ancestor_hashes_replay_update);
}
} }
} }
@ -510,6 +532,7 @@ pub(crate) fn check_slot_agrees_with_cluster(
duplicate_slots_tracker: &mut DuplicateSlotsTracker, duplicate_slots_tracker: &mut DuplicateSlotsTracker,
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut HashSet<(Slot, Hash)>, duplicate_slots_to_repair: &mut HashSet<(Slot, Hash)>,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
slot_state_update: SlotStateUpdate, slot_state_update: SlotStateUpdate,
) { ) {
info!( info!(
@ -533,12 +556,23 @@ pub(crate) fn check_slot_agrees_with_cluster(
} }
} }
// Avoid duplicate work from multiple of the same DuplicateConfirmed signal. This can
// happen if we get duplicate confirmed from gossip and from local replay.
if let SlotStateUpdate::DuplicateConfirmed(state) = &slot_state_update {
if let Some(bank_hash) = state.bank_status.bank_hash() {
if let Some(true) = fork_choice.is_duplicate_confirmed(&(slot, bank_hash)) {
return;
}
}
}
let state_changes = slot_state_update.into_state_changes(slot); let state_changes = slot_state_update.into_state_changes(slot);
apply_state_changes( apply_state_changes(
slot, slot,
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
blockstore, blockstore,
ancestor_hashes_replay_update_sender,
state_changes, state_changes,
); );
} }
@ -547,6 +581,7 @@ pub(crate) fn check_slot_agrees_with_cluster(
mod test { mod test {
use super::*; use super::*;
use crate::{progress_map::ProgressMap, replay_stage::tests::setup_forks_from_tree}; use crate::{progress_map::ProgressMap, replay_stage::tests::setup_forks_from_tree};
use crossbeam_channel::unbounded;
use solana_runtime::bank_forks::BankForks; use solana_runtime::bank_forks::BankForks;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -686,6 +721,7 @@ mod test {
( (
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
vec![ vec![
ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(10)),
ResultingStateChange::MarkSlotDuplicate(Hash::default()), ResultingStateChange::MarkSlotDuplicate(Hash::default()),
ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash)], ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash)],
) )
@ -727,7 +763,9 @@ mod test {
); );
( (
SlotStateUpdate::Dead(dead_state), SlotStateUpdate::Dead(dead_state),
Vec::<ResultingStateChange>::new() vec![
ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::Dead(10))
],
) )
}, },
dead_state_update_1: { dead_state_update_1: {
@ -739,7 +777,8 @@ mod test {
); );
( (
SlotStateUpdate::Dead(dead_state), SlotStateUpdate::Dead(dead_state),
vec![ResultingStateChange::MarkSlotDuplicate(Hash::default())], vec![
ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::Dead(10)), ResultingStateChange::MarkSlotDuplicate(Hash::default())],
) )
}, },
dead_state_update_2: { dead_state_update_2: {
@ -752,6 +791,7 @@ mod test {
( (
SlotStateUpdate::Dead(dead_state), SlotStateUpdate::Dead(dead_state),
vec![ vec![
ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(10)),
ResultingStateChange::MarkSlotDuplicate(Hash::default()), ResultingStateChange::MarkSlotDuplicate(Hash::default()),
ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash.unwrap())], ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash.unwrap())],
) )
@ -766,6 +806,7 @@ mod test {
( (
SlotStateUpdate::Dead(dead_state), SlotStateUpdate::Dead(dead_state),
vec![ vec![
ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(10)),
ResultingStateChange::MarkSlotDuplicate(Hash::default()), ResultingStateChange::MarkSlotDuplicate(Hash::default()),
ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash.unwrap())], ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash.unwrap())],
) )
@ -888,11 +929,14 @@ mod test {
.get(duplicate_slot) .get(duplicate_slot)
.unwrap() .unwrap()
.hash(); .hash();
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
apply_state_changes( apply_state_changes(
duplicate_slot, duplicate_slot,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender,
vec![ResultingStateChange::MarkSlotDuplicate(duplicate_slot_hash)], vec![ResultingStateChange::MarkSlotDuplicate(duplicate_slot_hash)],
); );
assert!(!heaviest_subtree_fork_choice assert!(!heaviest_subtree_fork_choice
@ -926,6 +970,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender,
vec![ResultingStateChange::RepairDuplicateConfirmedVersion( vec![ResultingStateChange::RepairDuplicateConfirmedVersion(
correct_hash, correct_hash,
)], )],
@ -956,11 +1001,14 @@ mod test {
// Simulate ReplayStage freezing a Bank with the given hash. // Simulate ReplayStage freezing a Bank with the given hash.
// BankFrozen should mark it down in Blockstore. // BankFrozen should mark it down in Blockstore.
assert!(blockstore.get_bank_hash(duplicate_slot).is_none()); assert!(blockstore.get_bank_hash(duplicate_slot).is_none());
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
apply_state_changes( apply_state_changes(
duplicate_slot, duplicate_slot,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender,
vec![ResultingStateChange::BankFrozen(duplicate_slot_hash)], vec![ResultingStateChange::BankFrozen(duplicate_slot_hash)],
); );
assert_eq!( assert_eq!(
@ -983,6 +1031,7 @@ mod test {
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender,
vec![ResultingStateChange::BankFrozen(new_bank_hash)], vec![ResultingStateChange::BankFrozen(new_bank_hash)],
); );
assert_eq!( assert_eq!(
@ -1024,11 +1073,14 @@ mod test {
our_duplicate_slot_hash, our_duplicate_slot_hash,
)]; )];
modify_state_changes(our_duplicate_slot_hash, &mut state_changes); modify_state_changes(our_duplicate_slot_hash, &mut state_changes);
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
apply_state_changes( apply_state_changes(
duplicate_slot, duplicate_slot,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&blockstore, &blockstore,
&ancestor_hashes_replay_update_sender,
state_changes, state_changes,
); );
for child_slot in descendants for child_slot in descendants
@ -1096,6 +1148,8 @@ mod test {
|| progress.is_dead(duplicate_slot).unwrap_or(false), || progress.is_dead(duplicate_slot).unwrap_or(false),
|| initial_bank_hash, || initial_bank_hash,
); );
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
check_slot_agrees_with_cluster( check_slot_agrees_with_cluster(
duplicate_slot, duplicate_slot,
root, root,
@ -1103,6 +1157,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&duplicate_slot)); assert!(duplicate_slots_tracker.contains(&duplicate_slot));
@ -1135,6 +1190,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::BankFrozen(bank_frozen_state), SlotStateUpdate::BankFrozen(bank_frozen_state),
); );
@ -1194,6 +1250,8 @@ mod test {
|| progress.is_dead(2).unwrap_or(false), || progress.is_dead(2).unwrap_or(false),
|| Some(slot2_hash), || Some(slot2_hash),
); );
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
check_slot_agrees_with_cluster( check_slot_agrees_with_cluster(
2, 2,
root, root,
@ -1201,6 +1259,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
assert!(heaviest_subtree_fork_choice assert!(heaviest_subtree_fork_choice
@ -1236,6 +1295,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&3)); assert!(duplicate_slots_tracker.contains(&3));
@ -1295,6 +1355,8 @@ mod test {
|| progress.is_dead(2).unwrap_or(false), || progress.is_dead(2).unwrap_or(false),
|| Some(slot2_hash), || Some(slot2_hash),
); );
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
check_slot_agrees_with_cluster( check_slot_agrees_with_cluster(
2, 2,
root, root,
@ -1302,6 +1364,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&2)); assert!(duplicate_slots_tracker.contains(&2));
@ -1335,6 +1398,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
for slot in 0..=3 { for slot in 0..=3 {
@ -1380,6 +1444,8 @@ mod test {
|| progress.is_dead(3).unwrap_or(false), || progress.is_dead(3).unwrap_or(false),
|| Some(slot3_hash), || Some(slot3_hash),
); );
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
check_slot_agrees_with_cluster( check_slot_agrees_with_cluster(
3, 3,
root, root,
@ -1387,6 +1453,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
let verify_all_slots_duplicate_confirmed = let verify_all_slots_duplicate_confirmed =
@ -1426,6 +1493,7 @@ mod test {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
assert!(duplicate_slots_tracker.contains(&1)); assert!(duplicate_slots_tracker.contains(&1));

View File

@ -1,7 +1,7 @@
//! The `repair_service` module implements the tools necessary to generate a thread which //! The `repair_service` module implements the tools necessary to generate a thread which
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds //! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{ use crate::{
ancestor_hashes_service::AncestorHashesService, ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService},
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
duplicate_repair_status::DuplicateSlotRepairStatus, duplicate_repair_status::DuplicateSlotRepairStatus,
@ -155,6 +155,7 @@ impl RepairService {
repair_info: RepairInfo, repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>, outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self { ) -> Self {
let t_repair = { let t_repair = {
let blockstore = blockstore.clone(); let blockstore = blockstore.clone();
@ -181,6 +182,7 @@ impl RepairService {
blockstore, blockstore,
ancestor_hashes_request_socket, ancestor_hashes_request_socket,
repair_info, repair_info,
ancestor_hashes_replay_update_receiver,
); );
RepairService { RepairService {

View File

@ -1,6 +1,7 @@
//! The `replay_stage` replays transactions broadcast by the leader. //! The `replay_stage` replays transactions broadcast by the leader.
use crate::{ use crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateSender,
broadcast_stage::RetransmitSlotsSender, broadcast_stage::RetransmitSlotsSender,
cache_block_meta_service::CacheBlockMetaSender, cache_block_meta_service::CacheBlockMetaSender,
cluster_info_vote_listener::{ cluster_info_vote_listener::{
@ -126,6 +127,7 @@ pub struct ReplayStageConfig {
pub cache_block_meta_sender: Option<CacheBlockMetaSender>, pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub bank_notification_sender: Option<BankNotificationSender>, pub bank_notification_sender: Option<BankNotificationSender>,
pub wait_for_vote_to_start_leader: bool, pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
} }
#[derive(Default)] #[derive(Default)]
@ -333,6 +335,7 @@ impl ReplayStage {
cache_block_meta_sender, cache_block_meta_sender,
bank_notification_sender, bank_notification_sender,
wait_for_vote_to_start_leader, wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
} = config; } = config;
trace!("replay stage"); trace!("replay stage");
@ -417,7 +420,8 @@ impl ReplayStage {
&mut latest_validator_votes_for_frozen_banks, &mut latest_validator_votes_for_frozen_banks,
&cluster_slots_update_sender, &cluster_slots_update_sender,
&cost_update_sender, &cost_update_sender,
&mut duplicate_slots_to_repair &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
); );
replay_active_banks_time.stop(); replay_active_banks_time.stop();
@ -433,7 +437,8 @@ impl ReplayStage {
&bank_forks, &bank_forks,
&mut progress, &mut progress,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
); );
process_gossip_duplicate_confirmed_slots_time.stop(); process_gossip_duplicate_confirmed_slots_time.stop();
@ -463,7 +468,8 @@ impl ReplayStage {
&bank_forks, &bank_forks,
&mut progress, &mut progress,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
); );
} }
process_duplicate_slots_time.stop(); process_duplicate_slots_time.stop();
@ -505,7 +511,7 @@ impl ReplayStage {
&bank_forks, &bank_forks,
); );
Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair); Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender);
} }
compute_slot_stats_time.stop(); compute_slot_stats_time.stop();
@ -1075,6 +1081,7 @@ impl ReplayStage {
progress: &mut ProgressMap, progress: &mut ProgressMap,
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) { ) {
let root = bank_forks.read().unwrap().root(); let root = bank_forks.read().unwrap().root();
for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() { for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() {
@ -1101,6 +1108,7 @@ impl ReplayStage {
duplicate_slots_tracker, duplicate_slots_tracker,
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
} }
@ -1136,6 +1144,7 @@ impl ReplayStage {
progress: &mut ProgressMap, progress: &mut ProgressMap,
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) { ) {
let new_duplicate_slots: Vec<Slot> = duplicate_slots_receiver.try_iter().collect(); let new_duplicate_slots: Vec<Slot> = duplicate_slots_receiver.try_iter().collect();
let (root_slot, bank_hashes) = { let (root_slot, bank_hashes) = {
@ -1165,6 +1174,7 @@ impl ReplayStage {
duplicate_slots_tracker, duplicate_slots_tracker,
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
} }
@ -1413,6 +1423,7 @@ impl ReplayStage {
progress: &mut ProgressMap, progress: &mut ProgressMap,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) { ) {
// Do not remove from progress map when marking dead! Needed by // Do not remove from progress map when marking dead! Needed by
// `process_gossip_duplicate_confirmed_slots()` // `process_gossip_duplicate_confirmed_slots()`
@ -1460,6 +1471,7 @@ impl ReplayStage {
duplicate_slots_tracker, duplicate_slots_tracker,
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
SlotStateUpdate::Dead(dead_state), SlotStateUpdate::Dead(dead_state),
); );
} }
@ -1859,6 +1871,7 @@ impl ReplayStage {
cluster_slots_update_sender: &ClusterSlotsUpdateSender, cluster_slots_update_sender: &ClusterSlotsUpdateSender,
cost_update_sender: &Sender<ExecuteTimings>, cost_update_sender: &Sender<ExecuteTimings>,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -1926,6 +1939,7 @@ impl ReplayStage {
progress, progress,
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
); );
// If the bank was corrupted, don't try to run the below logic to check if the // If the bank was corrupted, don't try to run the below logic to check if the
// bank is completed // bank is completed
@ -1974,6 +1988,7 @@ impl ReplayStage {
duplicate_slots_tracker, duplicate_slots_tracker,
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
SlotStateUpdate::BankFrozen(bank_frozen_state), SlotStateUpdate::BankFrozen(bank_frozen_state),
); );
if let Some(sender) = bank_notification_sender { if let Some(sender) = bank_notification_sender {
@ -2490,6 +2505,7 @@ impl ReplayStage {
duplicate_slots_tracker: &mut DuplicateSlotsTracker, duplicate_slots_tracker: &mut DuplicateSlotsTracker,
fork_choice: &mut HeaviestSubtreeForkChoice, fork_choice: &mut HeaviestSubtreeForkChoice,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
) { ) {
let root_slot = bank_forks.read().unwrap().root(); let root_slot = bank_forks.read().unwrap().root();
for (slot, frozen_hash) in confirmed_forks.iter() { for (slot, frozen_hash) in confirmed_forks.iter() {
@ -2515,6 +2531,7 @@ impl ReplayStage {
duplicate_slots_tracker, duplicate_slots_tracker,
fork_choice, fork_choice,
duplicate_slots_to_repair, duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
} }
@ -3393,6 +3410,8 @@ pub mod tests {
block_commitment_cache, block_commitment_cache,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)); ));
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
if let Err(err) = &res { if let Err(err) = &res {
ReplayStage::mark_dead_slot( ReplayStage::mark_dead_slot(
&blockstore, &blockstore,
@ -3405,6 +3424,7 @@ pub mod tests {
&mut progress, &mut progress,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
); );
} }
@ -4842,6 +4862,8 @@ pub mod tests {
|| progress.is_dead(4).unwrap_or(false), || progress.is_dead(4).unwrap_or(false),
|| Some(bank4_hash), || Some(bank4_hash),
); );
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
check_slot_agrees_with_cluster( check_slot_agrees_with_cluster(
4, 4,
bank_forks.read().unwrap().root(), bank_forks.read().unwrap().root(),
@ -4849,6 +4871,7 @@ pub mod tests {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
@ -4880,6 +4903,7 @@ pub mod tests {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut DuplicateSlotsToRepair::default(), &mut DuplicateSlotsToRepair::default(),
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::Duplicate(duplicate_state), SlotStateUpdate::Duplicate(duplicate_state),
); );
@ -4912,6 +4936,7 @@ pub mod tests {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
// The confirmed hash is detected in `progress`, which means // The confirmed hash is detected in `progress`, which means
@ -5048,6 +5073,8 @@ pub mod tests {
|| progress.is_dead(2).unwrap_or(false), || progress.is_dead(2).unwrap_or(false),
|| Some(our_bank2_hash), || Some(our_bank2_hash),
); );
let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) =
unbounded();
check_slot_agrees_with_cluster( check_slot_agrees_with_cluster(
2, 2,
bank_forks.read().unwrap().root(), bank_forks.read().unwrap().root(),
@ -5055,6 +5082,7 @@ pub mod tests {
&mut duplicate_slots_tracker, &mut duplicate_slots_tracker,
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state),
); );
assert!(duplicate_slots_to_repair.contains(&(2, duplicate_confirmed_bank2_hash))); assert!(duplicate_slots_to_repair.contains(&(2, duplicate_confirmed_bank2_hash)));

View File

@ -2,6 +2,7 @@
#![allow(clippy::rc_buffer)] #![allow(clippy::rc_buffer)]
use crate::{ use crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::ClusterNodes, cluster_nodes::ClusterNodes,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
@ -545,6 +546,7 @@ impl RetransmitStage {
max_slots: &Arc<MaxSlots>, max_slots: &Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>, rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
duplicate_slots_sender: Sender<Slot>, duplicate_slots_sender: Sender<Slot>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -603,6 +605,7 @@ impl RetransmitStage {
verified_vote_receiver, verified_vote_receiver,
completed_data_sets_sender, completed_data_sets_sender,
duplicate_slots_sender, duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
); );
Self { Self {

View File

@ -169,6 +169,8 @@ impl Tvu {
let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter;
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
unbounded();
let retransmit_stage = RetransmitStage::new( let retransmit_stage = RetransmitStage::new(
bank_forks.clone(), bank_forks.clone(),
leader_schedule_cache, leader_schedule_cache,
@ -190,6 +192,7 @@ impl Tvu {
max_slots, max_slots,
Some(rpc_subscriptions.clone()), Some(rpc_subscriptions.clone()),
duplicate_slots_sender, duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
); );
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
@ -273,6 +276,7 @@ impl Tvu {
cache_block_meta_sender, cache_block_meta_sender,
bank_notification_sender, bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
}; };
let (voting_sender, voting_receiver) = channel(); let (voting_sender, voting_receiver) = channel();

View File

@ -2,6 +2,7 @@
//! blockstore and retransmitting where required //! blockstore and retransmitting where required
//! //!
use crate::{ use crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
outstanding_requests::OutstandingRequests, outstanding_requests::OutstandingRequests,
@ -341,6 +342,7 @@ impl WindowService {
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: CompletedDataSetsSender, completed_data_sets_sender: CompletedDataSetsSender,
duplicate_slots_sender: DuplicateSlotSender, duplicate_slots_sender: DuplicateSlotSender,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
@ -362,6 +364,7 @@ impl WindowService {
repair_info, repair_info,
verified_vote_receiver, verified_vote_receiver,
outstanding_requests.clone(), outstanding_requests.clone(),
ancestor_hashes_replay_update_receiver,
); );
let (insert_sender, insert_receiver) = unbounded(); let (insert_sender, insert_receiver) = unbounded();