diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 058d6b085..ed722ea0a 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -6,6 +6,7 @@ use crate::{ result::{Error, Result}, serve_repair::AncestorHashesRepairType, }; +use crossbeam_channel::{Receiver, Sender}; use dashmap::{mapref::entry::Entry::Occupied, DashMap}; use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}; use solana_measure::measure::Measure; @@ -29,7 +30,16 @@ use std::{ time::{Duration, Instant}, }; +#[derive(Debug, PartialEq)] +pub enum AncestorHashesReplayUpdate { + Dead(Slot), + DeadDuplicateConfirmed(Slot), +} + pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2; + +pub type AncestorHashesReplayUpdateSender = Sender; +pub type AncestorHashesReplayUpdateReceiver = Receiver; type OutstandingAncestorHashesRepairs = OutstandingRequests; #[derive(Default)] @@ -114,6 +124,7 @@ impl AncestorHashesService { blockstore: Arc, ancestor_hashes_request_socket: Arc, repair_info: RepairInfo, + _ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { let outstanding_requests: Arc> = Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default())); diff --git a/core/src/cluster_slot_state_verifier.rs b/core/src/cluster_slot_state_verifier.rs index 3ba172bd0..422a37b48 100644 --- a/core/src/cluster_slot_state_verifier.rs +++ b/core/src/cluster_slot_state_verifier.rs @@ -1,4 +1,8 @@ -use crate::{fork_choice::ForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice}; +use crate::{ + ancestor_hashes_service::{AncestorHashesReplayUpdate, AncestorHashesReplayUpdateSender}, + fork_choice::ForkChoice, + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, +}; use solana_ledger::blockstore::Blockstore; use solana_sdk::{clock::Slot, hash::Hash}; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -220,6 +224,7 @@ pub enum ResultingStateChange { RepairDuplicateConfirmedVersion(Hash), // Hash of our current frozen version of the slot DuplicateConfirmedSlotMatchesCluster(Hash), + SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate), } impl SlotStateUpdate { @@ -294,6 +299,9 @@ fn on_dead_slot(slot: Slot, dead_state: DeadState) -> Vec // check if our version agrees with the cluster, let bank_hash = Hash::default(); let is_dead = true; + state_changes.push(ResultingStateChange::SendAncestorHashesReplayUpdate( + AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot), + )); check_duplicate_confirmed_hash_against_frozen_hash( &mut state_changes, slot, @@ -301,8 +309,13 @@ fn on_dead_slot(slot: Slot, dead_state: DeadState) -> Vec bank_hash, is_dead, ); - } else if is_slot_duplicate { - state_changes.push(ResultingStateChange::MarkSlotDuplicate(Hash::default())); + } else { + state_changes.push(ResultingStateChange::SendAncestorHashesReplayUpdate( + AncestorHashesReplayUpdate::Dead(slot), + )); + if is_slot_duplicate { + state_changes.push(ResultingStateChange::MarkSlotDuplicate(Hash::default())); + } } state_changes @@ -351,9 +364,14 @@ fn on_duplicate_confirmed( } let bank_hash = bank_status.bank_hash().expect("bank hash must exist"); - let is_dead = bank_status.is_dead(); let mut state_changes = vec![]; + let is_dead = bank_status.is_dead(); + if is_dead { + state_changes.push(ResultingStateChange::SendAncestorHashesReplayUpdate( + AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot), + )); + } check_duplicate_confirmed_hash_against_frozen_hash( &mut state_changes, slot, @@ -460,6 +478,7 @@ fn apply_state_changes( fork_choice: &mut HeaviestSubtreeForkChoice, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, blockstore: &Blockstore, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, state_changes: Vec, ) { // Handle cases where the bank is frozen, but not duplicate confirmed @@ -494,6 +513,9 @@ fn apply_state_changes( ) .unwrap(); } + ResultingStateChange::SendAncestorHashesReplayUpdate(ancestor_hashes_replay_update) => { + let _ = ancestor_hashes_replay_update_sender.send(ancestor_hashes_replay_update); + } } } @@ -510,6 +532,7 @@ pub(crate) fn check_slot_agrees_with_cluster( duplicate_slots_tracker: &mut DuplicateSlotsTracker, fork_choice: &mut HeaviestSubtreeForkChoice, duplicate_slots_to_repair: &mut HashSet<(Slot, Hash)>, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, slot_state_update: SlotStateUpdate, ) { info!( @@ -533,12 +556,23 @@ pub(crate) fn check_slot_agrees_with_cluster( } } + // Avoid duplicate work from multiple of the same DuplicateConfirmed signal. This can + // happen if we get duplicate confirmed from gossip and from local replay. + if let SlotStateUpdate::DuplicateConfirmed(state) = &slot_state_update { + if let Some(bank_hash) = state.bank_status.bank_hash() { + if let Some(true) = fork_choice.is_duplicate_confirmed(&(slot, bank_hash)) { + return; + } + } + } + let state_changes = slot_state_update.into_state_changes(slot); apply_state_changes( slot, fork_choice, duplicate_slots_to_repair, blockstore, + ancestor_hashes_replay_update_sender, state_changes, ); } @@ -547,6 +581,7 @@ pub(crate) fn check_slot_agrees_with_cluster( mod test { use super::*; use crate::{progress_map::ProgressMap, replay_stage::tests::setup_forks_from_tree}; + use crossbeam_channel::unbounded; use solana_runtime::bank_forks::BankForks; use std::{ collections::{HashMap, HashSet}, @@ -686,6 +721,7 @@ mod test { ( SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), vec![ + ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(10)), ResultingStateChange::MarkSlotDuplicate(Hash::default()), ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash)], ) @@ -727,7 +763,9 @@ mod test { ); ( SlotStateUpdate::Dead(dead_state), - Vec::::new() + vec![ + ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::Dead(10)) + ], ) }, dead_state_update_1: { @@ -739,7 +777,8 @@ mod test { ); ( SlotStateUpdate::Dead(dead_state), - vec![ResultingStateChange::MarkSlotDuplicate(Hash::default())], + vec![ + ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::Dead(10)), ResultingStateChange::MarkSlotDuplicate(Hash::default())], ) }, dead_state_update_2: { @@ -752,6 +791,7 @@ mod test { ( SlotStateUpdate::Dead(dead_state), vec![ + ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(10)), ResultingStateChange::MarkSlotDuplicate(Hash::default()), ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash.unwrap())], ) @@ -766,6 +806,7 @@ mod test { ( SlotStateUpdate::Dead(dead_state), vec![ + ResultingStateChange::SendAncestorHashesReplayUpdate(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(10)), ResultingStateChange::MarkSlotDuplicate(Hash::default()), ResultingStateChange::RepairDuplicateConfirmedVersion(duplicate_confirmed_hash.unwrap())], ) @@ -888,11 +929,14 @@ mod test { .get(duplicate_slot) .unwrap() .hash(); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); apply_state_changes( duplicate_slot, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &blockstore, + &ancestor_hashes_replay_update_sender, vec![ResultingStateChange::MarkSlotDuplicate(duplicate_slot_hash)], ); assert!(!heaviest_subtree_fork_choice @@ -926,6 +970,7 @@ mod test { &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &blockstore, + &ancestor_hashes_replay_update_sender, vec![ResultingStateChange::RepairDuplicateConfirmedVersion( correct_hash, )], @@ -956,11 +1001,14 @@ mod test { // Simulate ReplayStage freezing a Bank with the given hash. // BankFrozen should mark it down in Blockstore. assert!(blockstore.get_bank_hash(duplicate_slot).is_none()); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); apply_state_changes( duplicate_slot, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &blockstore, + &ancestor_hashes_replay_update_sender, vec![ResultingStateChange::BankFrozen(duplicate_slot_hash)], ); assert_eq!( @@ -983,6 +1031,7 @@ mod test { &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &blockstore, + &ancestor_hashes_replay_update_sender, vec![ResultingStateChange::BankFrozen(new_bank_hash)], ); assert_eq!( @@ -1024,11 +1073,14 @@ mod test { our_duplicate_slot_hash, )]; modify_state_changes(our_duplicate_slot_hash, &mut state_changes); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); apply_state_changes( duplicate_slot, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &blockstore, + &ancestor_hashes_replay_update_sender, state_changes, ); for child_slot in descendants @@ -1096,6 +1148,8 @@ mod test { || progress.is_dead(duplicate_slot).unwrap_or(false), || initial_bank_hash, ); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); check_slot_agrees_with_cluster( duplicate_slot, root, @@ -1103,6 +1157,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); assert!(duplicate_slots_tracker.contains(&duplicate_slot)); @@ -1135,6 +1190,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, SlotStateUpdate::BankFrozen(bank_frozen_state), ); @@ -1194,6 +1250,8 @@ mod test { || progress.is_dead(2).unwrap_or(false), || Some(slot2_hash), ); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); check_slot_agrees_with_cluster( 2, root, @@ -1201,6 +1259,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); assert!(heaviest_subtree_fork_choice @@ -1236,6 +1295,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); assert!(duplicate_slots_tracker.contains(&3)); @@ -1295,6 +1355,8 @@ mod test { || progress.is_dead(2).unwrap_or(false), || Some(slot2_hash), ); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); check_slot_agrees_with_cluster( 2, root, @@ -1302,6 +1364,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); assert!(duplicate_slots_tracker.contains(&2)); @@ -1335,6 +1398,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); for slot in 0..=3 { @@ -1380,6 +1444,8 @@ mod test { || progress.is_dead(3).unwrap_or(false), || Some(slot3_hash), ); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); check_slot_agrees_with_cluster( 3, root, @@ -1387,6 +1453,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); let verify_all_slots_duplicate_confirmed = @@ -1426,6 +1493,7 @@ mod test { &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); assert!(duplicate_slots_tracker.contains(&1)); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 2b7b71f40..b3041d6bb 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -1,7 +1,7 @@ //! The `repair_service` module implements the tools necessary to generate a thread which //! regularly finds missing shreds in the ledger and sends repair requests for those shreds use crate::{ - ancestor_hashes_service::AncestorHashesService, + ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService}, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, duplicate_repair_status::DuplicateSlotRepairStatus, @@ -155,6 +155,7 @@ impl RepairService { repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { let t_repair = { let blockstore = blockstore.clone(); @@ -181,6 +182,7 @@ impl RepairService { blockstore, ancestor_hashes_request_socket, repair_info, + ancestor_hashes_replay_update_receiver, ); RepairService { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f5d69e46a..6652f4cf1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1,6 +1,7 @@ //! The `replay_stage` replays transactions broadcast by the leader. use crate::{ + ancestor_hashes_service::AncestorHashesReplayUpdateSender, broadcast_stage::RetransmitSlotsSender, cache_block_meta_service::CacheBlockMetaSender, cluster_info_vote_listener::{ @@ -126,6 +127,7 @@ pub struct ReplayStageConfig { pub cache_block_meta_sender: Option, pub bank_notification_sender: Option, pub wait_for_vote_to_start_leader: bool, + pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, } #[derive(Default)] @@ -333,6 +335,7 @@ impl ReplayStage { cache_block_meta_sender, bank_notification_sender, wait_for_vote_to_start_leader, + ancestor_hashes_replay_update_sender, } = config; trace!("replay stage"); @@ -417,7 +420,8 @@ impl ReplayStage { &mut latest_validator_votes_for_frozen_banks, &cluster_slots_update_sender, &cost_update_sender, - &mut duplicate_slots_to_repair + &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, ); replay_active_banks_time.stop(); @@ -433,7 +437,8 @@ impl ReplayStage { &bank_forks, &mut progress, &mut heaviest_subtree_fork_choice, - &mut duplicate_slots_to_repair + &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, ); process_gossip_duplicate_confirmed_slots_time.stop(); @@ -463,7 +468,8 @@ impl ReplayStage { &bank_forks, &mut progress, &mut heaviest_subtree_fork_choice, - &mut duplicate_slots_to_repair + &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, ); } process_duplicate_slots_time.stop(); @@ -505,7 +511,7 @@ impl ReplayStage { &bank_forks, ); - Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair); + Self::mark_slots_confirmed(&confirmed_forks, &blockstore, &bank_forks, &mut progress, &mut duplicate_slots_tracker, &mut heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender); } compute_slot_stats_time.stop(); @@ -1075,6 +1081,7 @@ impl ReplayStage { progress: &mut ProgressMap, fork_choice: &mut HeaviestSubtreeForkChoice, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ) { let root = bank_forks.read().unwrap().root(); for new_confirmed_slots in gossip_duplicate_confirmed_slots_receiver.try_iter() { @@ -1101,6 +1108,7 @@ impl ReplayStage { duplicate_slots_tracker, fork_choice, duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); } @@ -1136,6 +1144,7 @@ impl ReplayStage { progress: &mut ProgressMap, fork_choice: &mut HeaviestSubtreeForkChoice, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ) { let new_duplicate_slots: Vec = duplicate_slots_receiver.try_iter().collect(); let (root_slot, bank_hashes) = { @@ -1165,6 +1174,7 @@ impl ReplayStage { duplicate_slots_tracker, fork_choice, duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); } @@ -1413,6 +1423,7 @@ impl ReplayStage { progress: &mut ProgressMap, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ) { // Do not remove from progress map when marking dead! Needed by // `process_gossip_duplicate_confirmed_slots()` @@ -1460,6 +1471,7 @@ impl ReplayStage { duplicate_slots_tracker, heaviest_subtree_fork_choice, duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, SlotStateUpdate::Dead(dead_state), ); } @@ -1859,6 +1871,7 @@ impl ReplayStage { cluster_slots_update_sender: &ClusterSlotsUpdateSender, cost_update_sender: &Sender, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -1926,6 +1939,7 @@ impl ReplayStage { progress, heaviest_subtree_fork_choice, duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, ); // If the bank was corrupted, don't try to run the below logic to check if the // bank is completed @@ -1974,6 +1988,7 @@ impl ReplayStage { duplicate_slots_tracker, heaviest_subtree_fork_choice, duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, SlotStateUpdate::BankFrozen(bank_frozen_state), ); if let Some(sender) = bank_notification_sender { @@ -2490,6 +2505,7 @@ impl ReplayStage { duplicate_slots_tracker: &mut DuplicateSlotsTracker, fork_choice: &mut HeaviestSubtreeForkChoice, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ) { let root_slot = bank_forks.read().unwrap().root(); for (slot, frozen_hash) in confirmed_forks.iter() { @@ -2515,6 +2531,7 @@ impl ReplayStage { duplicate_slots_tracker, fork_choice, duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); } @@ -3393,6 +3410,8 @@ pub mod tests { block_commitment_cache, OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); if let Err(err) = &res { ReplayStage::mark_dead_slot( &blockstore, @@ -3405,6 +3424,7 @@ pub mod tests { &mut progress, &mut heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, ); } @@ -4842,6 +4862,8 @@ pub mod tests { || progress.is_dead(4).unwrap_or(false), || Some(bank4_hash), ); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); check_slot_agrees_with_cluster( 4, bank_forks.read().unwrap().root(), @@ -4849,6 +4871,7 @@ pub mod tests { &mut duplicate_slots_tracker, &mut vote_simulator.heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); @@ -4880,6 +4903,7 @@ pub mod tests { &mut duplicate_slots_tracker, &mut vote_simulator.heaviest_subtree_fork_choice, &mut DuplicateSlotsToRepair::default(), + &ancestor_hashes_replay_update_sender, SlotStateUpdate::Duplicate(duplicate_state), ); @@ -4912,6 +4936,7 @@ pub mod tests { &mut duplicate_slots_tracker, &mut vote_simulator.heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); // The confirmed hash is detected in `progress`, which means @@ -5048,6 +5073,8 @@ pub mod tests { || progress.is_dead(2).unwrap_or(false), || Some(our_bank2_hash), ); + let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = + unbounded(); check_slot_agrees_with_cluster( 2, bank_forks.read().unwrap().root(), @@ -5055,6 +5082,7 @@ pub mod tests { &mut duplicate_slots_tracker, heaviest_subtree_fork_choice, &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, SlotStateUpdate::DuplicateConfirmed(duplicate_confirmed_state), ); assert!(duplicate_slots_to_repair.contains(&(2, duplicate_confirmed_bank2_hash))); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index ee8bd78b6..9964844cd 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -2,6 +2,7 @@ #![allow(clippy::rc_buffer)] use crate::{ + ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_nodes::ClusterNodes, cluster_slots::ClusterSlots, @@ -545,6 +546,7 @@ impl RetransmitStage { max_slots: &Arc, rpc_subscriptions: Option>, duplicate_slots_sender: Sender, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -603,6 +605,7 @@ impl RetransmitStage { verified_vote_receiver, completed_data_sets_sender, duplicate_slots_sender, + ancestor_hashes_replay_update_receiver, ); Self { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 06139ab34..4fd64accd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -169,6 +169,8 @@ impl Tvu { let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded(); + let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = + unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -190,6 +192,7 @@ impl Tvu { max_slots, Some(rpc_subscriptions.clone()), duplicate_slots_sender, + ancestor_hashes_replay_update_receiver, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -273,6 +276,7 @@ impl Tvu { cache_block_meta_sender, bank_notification_sender, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, + ancestor_hashes_replay_update_sender, }; let (voting_sender, voting_receiver) = channel(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 227748b1c..8ed795a3e 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -2,6 +2,7 @@ //! blockstore and retransmitting where required //! use crate::{ + ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, cluster_info_vote_listener::VerifiedVoteReceiver, completed_data_sets_service::CompletedDataSetsSender, outstanding_requests::OutstandingRequests, @@ -341,6 +342,7 @@ impl WindowService { verified_vote_receiver: VerifiedVoteReceiver, completed_data_sets_sender: CompletedDataSetsSender, duplicate_slots_sender: DuplicateSlotSender, + ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ) -> WindowService where F: 'static @@ -362,6 +364,7 @@ impl WindowService { repair_info, verified_vote_receiver, outstanding_requests.clone(), + ancestor_hashes_replay_update_receiver, ); let (insert_sender, insert_receiver) = unbounded();