Add feature flag for LastIndex and Erasure duplicate proofs (#34360)
* Add feature flag for LastIndex and Erasure duplicate proofs * pr feedback: use root bank instead of 2 params * pr feedback: & instead of &Arc * pr feedback: reuse fn, remove redundant clones * rebase: fix feature set conflict
This commit is contained in:
parent
210d320f16
commit
def3bc4c4f
|
@ -1,6 +1,7 @@
|
|||
//! `window_service` handles the data plane incoming shreds, storing them in
|
||||
//! blockstore and retransmitting where required
|
||||
//!
|
||||
|
||||
use {
|
||||
crate::{
|
||||
cluster_info_vote_listener::VerifiedVoteReceiver,
|
||||
|
@ -28,7 +29,12 @@ use {
|
|||
solana_metrics::inc_new_counter_error,
|
||||
solana_perf::packet::{Packet, PacketBatch},
|
||||
solana_rayon_threadlimit::get_thread_count,
|
||||
solana_sdk::clock::Slot,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::{
|
||||
clock::{Slot, DEFAULT_MS_PER_SLOT},
|
||||
feature_set,
|
||||
},
|
||||
solana_turbine::cluster_nodes,
|
||||
std::{
|
||||
cmp::Reverse,
|
||||
collections::{HashMap, HashSet},
|
||||
|
@ -142,12 +148,31 @@ fn run_check_duplicate(
|
|||
blockstore: &Blockstore,
|
||||
shred_receiver: &Receiver<PossibleDuplicateShred>,
|
||||
duplicate_slots_sender: &DuplicateSlotSender,
|
||||
bank_forks: &RwLock<BankForks>,
|
||||
) -> Result<()> {
|
||||
let mut root_bank = bank_forks.read().unwrap().root_bank();
|
||||
let mut last_updated = Instant::now();
|
||||
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
|
||||
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
|
||||
// Grabs bank forks lock once a slot
|
||||
last_updated = Instant::now();
|
||||
root_bank = bank_forks.read().unwrap().root_bank();
|
||||
}
|
||||
let shred_slot = shred.slot();
|
||||
let send_index_and_erasure_conflicts = cluster_nodes::check_feature_activation(
|
||||
&feature_set::index_erasure_conflict_duplicate_proofs::id(),
|
||||
shred_slot,
|
||||
&root_bank,
|
||||
);
|
||||
let (shred1, shred2) = match shred {
|
||||
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
|
||||
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
|
||||
PossibleDuplicateShred::LastIndexConflict(shred, conflict)
|
||||
| PossibleDuplicateShred::ErasureConflict(shred, conflict) => {
|
||||
if send_index_and_erasure_conflicts {
|
||||
(shred, conflict)
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
PossibleDuplicateShred::Exists(shred) => {
|
||||
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
|
||||
// in blockstore. This is because the duplicate could have been part of the same insert batch,
|
||||
|
@ -342,6 +367,7 @@ impl WindowService {
|
|||
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
|
||||
|
||||
let cluster_info = repair_info.cluster_info.clone();
|
||||
let bank_forks = repair_info.bank_forks.clone();
|
||||
|
||||
let repair_service = RepairService::new(
|
||||
blockstore.clone(),
|
||||
|
@ -366,6 +392,7 @@ impl WindowService {
|
|||
blockstore.clone(),
|
||||
duplicate_receiver,
|
||||
duplicate_slots_sender,
|
||||
bank_forks,
|
||||
);
|
||||
|
||||
let t_insert = Self::start_window_insert_thread(
|
||||
|
@ -392,6 +419,7 @@ impl WindowService {
|
|||
blockstore: Arc<Blockstore>,
|
||||
duplicate_receiver: Receiver<PossibleDuplicateShred>,
|
||||
duplicate_slots_sender: DuplicateSlotSender,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
) -> JoinHandle<()> {
|
||||
let handle_error = || {
|
||||
inc_new_counter_error!("solana-check-duplicate-error", 1, 1);
|
||||
|
@ -405,6 +433,7 @@ impl WindowService {
|
|||
&blockstore,
|
||||
&duplicate_receiver,
|
||||
&duplicate_slots_sender,
|
||||
&bank_forks,
|
||||
) {
|
||||
if Self::should_exit_on_error(e, &handle_error) {
|
||||
break;
|
||||
|
@ -507,9 +536,11 @@ mod test {
|
|||
solana_gossip::contact_info::ContactInfo,
|
||||
solana_ledger::{
|
||||
blockstore::{make_many_slot_entries, Blockstore},
|
||||
genesis_utils::create_genesis_config,
|
||||
get_tmp_ledger_path_auto_delete,
|
||||
shred::{ProcessShredsStats, Shredder},
|
||||
},
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
hash::Hash,
|
||||
signature::{Keypair, Signer},
|
||||
|
@ -556,6 +587,8 @@ mod test {
|
|||
#[test]
|
||||
fn test_run_check_duplicate() {
|
||||
let ledger_path = get_tmp_ledger_path_auto_delete!();
|
||||
let genesis_config = create_genesis_config(10_000).genesis_config;
|
||||
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
|
||||
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
|
||||
let (sender, receiver) = unbounded();
|
||||
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
|
||||
|
@ -587,6 +620,7 @@ mod test {
|
|||
&blockstore,
|
||||
&receiver,
|
||||
&duplicate_slot_sender,
|
||||
&bank_forks,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -616,6 +650,8 @@ mod test {
|
|||
Arc::new(keypair),
|
||||
SocketAddrSpace::Unspecified,
|
||||
));
|
||||
let genesis_config = create_genesis_config(10_000).genesis_config;
|
||||
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
|
||||
|
||||
// Start duplicate thread receiving and inserting duplicates
|
||||
let t_check_duplicate = WindowService::start_check_duplicate_thread(
|
||||
|
@ -624,6 +660,7 @@ mod test {
|
|||
blockstore.clone(),
|
||||
duplicate_shred_receiver,
|
||||
duplicate_slot_sender,
|
||||
bank_forks,
|
||||
);
|
||||
|
||||
let handle_duplicate = |shred| {
|
||||
|
|
|
@ -744,6 +744,10 @@ pub mod consume_blockstore_duplicate_proofs {
|
|||
solana_sdk::declare_id!("6YsBCejwK96GZCkJ6mkZ4b68oP63z2PLoQmWjC7ggTqZ");
|
||||
}
|
||||
|
||||
pub mod index_erasure_conflict_duplicate_proofs {
|
||||
solana_sdk::declare_id!("dupPajaLy2SSn8ko42aZz4mHANDNrLe8Nw8VQgFecLa");
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
/// Map of feature identifiers to user-visible description
|
||||
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
|
||||
|
@ -924,7 +928,8 @@ lazy_static! {
|
|||
(enable_zk_transfer_with_fee::id(), "enable Zk Token proof program transfer with fee"),
|
||||
(drop_legacy_shreds::id(), "drops legacy shreds #34328"),
|
||||
(allow_commission_decrease_at_any_time::id(), "Allow commission decrease at any time in epoch #33843"),
|
||||
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372")
|
||||
(consume_blockstore_duplicate_proofs::id(), "consume duplicate proofs from blockstore in consensus #34372"),
|
||||
(index_erasure_conflict_duplicate_proofs::id(), "generate duplicate proofs for index and erasure conflicts #34360"),
|
||||
/*************** ADD NEW FEATURES HERE ***************/
|
||||
]
|
||||
.iter()
|
||||
|
|
|
@ -513,7 +513,7 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool
|
|||
|
||||
// Returns true if the feature is effective for the shred slot.
|
||||
#[must_use]
|
||||
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
|
||||
pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
|
||||
match root_bank.feature_set.activated_slot(feature) {
|
||||
None => false,
|
||||
Some(feature_slot) => {
|
||||
|
|
Loading…
Reference in New Issue