reduce locking in propagated check for VoteStateUpdate (#33997)
This commit is contained in:
parent
eb7e68f029
commit
fb76b4cb6c
|
@ -44,6 +44,7 @@ use {
|
|||
vote_transaction::VoteTransaction,
|
||||
},
|
||||
std::{
|
||||
cmp::max,
|
||||
collections::{HashMap, HashSet},
|
||||
iter::repeat,
|
||||
sync::{
|
||||
|
@ -499,6 +500,7 @@ impl ClusterInfoVoteListener {
|
|||
) -> Result<()> {
|
||||
let mut confirmation_verifier =
|
||||
OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root());
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
let mut last_process_root = Instant::now();
|
||||
let duplicate_confirmed_slot_sender = Some(duplicate_confirmed_slot_sender);
|
||||
let mut vote_processing_time = Some(VoteProcessingTiming::default());
|
||||
|
@ -533,6 +535,7 @@ impl ClusterInfoVoteListener {
|
|||
&bank_notification_sender,
|
||||
&duplicate_confirmed_slot_sender,
|
||||
&mut vote_processing_time,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
);
|
||||
match confirmed_slots {
|
||||
Ok(confirmed_slots) => {
|
||||
|
@ -573,6 +576,7 @@ impl ClusterInfoVoteListener {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut HashMap::new(),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -588,6 +592,7 @@ impl ClusterInfoVoteListener {
|
|||
bank_notification_sender: &Option<BankNotificationSender>,
|
||||
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
|
||||
vote_processing_time: &mut Option<VoteProcessingTiming>,
|
||||
latest_vote_slot_per_validator: &mut HashMap<Pubkey, Slot>,
|
||||
) -> Result<ThresholdConfirmedSlots> {
|
||||
let mut sel = Select::new();
|
||||
sel.recv(gossip_vote_txs_receiver);
|
||||
|
@ -617,6 +622,7 @@ impl ClusterInfoVoteListener {
|
|||
bank_notification_sender,
|
||||
duplicate_confirmed_slot_sender,
|
||||
vote_processing_time,
|
||||
latest_vote_slot_per_validator,
|
||||
));
|
||||
}
|
||||
remaining_wait_time = remaining_wait_time.saturating_sub(start.elapsed());
|
||||
|
@ -639,6 +645,7 @@ impl ClusterInfoVoteListener {
|
|||
is_gossip_vote: bool,
|
||||
bank_notification_sender: &Option<BankNotificationSender>,
|
||||
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
|
||||
latest_vote_slot_per_validator: &mut HashMap<Pubkey, Slot>,
|
||||
) {
|
||||
if vote.is_empty() {
|
||||
return;
|
||||
|
@ -646,6 +653,10 @@ impl ClusterInfoVoteListener {
|
|||
|
||||
let (last_vote_slot, last_vote_hash) = vote.last_voted_slot_hash().unwrap();
|
||||
|
||||
let latest_vote_slot = latest_vote_slot_per_validator
|
||||
.entry(*vote_pubkey)
|
||||
.or_insert(0);
|
||||
|
||||
let root = root_bank.slot();
|
||||
let mut is_new_vote = false;
|
||||
let vote_slots = vote.slots();
|
||||
|
@ -724,6 +735,14 @@ impl ClusterInfoVoteListener {
|
|||
is_new_vote = is_new;
|
||||
}
|
||||
|
||||
if slot < *latest_vote_slot {
|
||||
// Important that we filter after the `last_vote_slot` check, as even if this vote
|
||||
// is old, we still need to track optimistic confirmations.
|
||||
// However it is fine to filter the rest of the slots for the propagated check tracking below,
|
||||
// as the propagated check is able to roll up votes for descendants unlike optimistic confirmation.
|
||||
continue;
|
||||
}
|
||||
|
||||
diff.entry(slot)
|
||||
.or_default()
|
||||
.entry(*vote_pubkey)
|
||||
|
@ -733,6 +752,8 @@ impl ClusterInfoVoteListener {
|
|||
.or_insert(is_gossip_vote);
|
||||
}
|
||||
|
||||
*latest_vote_slot = max(*latest_vote_slot, last_vote_slot);
|
||||
|
||||
if is_new_vote {
|
||||
subscriptions.notify_vote(*vote_pubkey, vote, vote_transaction_signature);
|
||||
let _ = verified_vote_sender.send((*vote_pubkey, vote_slots));
|
||||
|
@ -751,6 +772,7 @@ impl ClusterInfoVoteListener {
|
|||
bank_notification_sender: &Option<BankNotificationSender>,
|
||||
duplicate_confirmed_slot_sender: &Option<DuplicateConfirmedSlotsSender>,
|
||||
vote_processing_time: &mut Option<VoteProcessingTiming>,
|
||||
latest_vote_slot_per_validator: &mut HashMap<Pubkey, Slot>,
|
||||
) -> ThresholdConfirmedSlots {
|
||||
let mut diff: HashMap<Slot, HashMap<Pubkey, bool>> = HashMap::new();
|
||||
let mut new_optimistic_confirmed_slots = vec![];
|
||||
|
@ -777,6 +799,7 @@ impl ClusterInfoVoteListener {
|
|||
is_gossip,
|
||||
bank_notification_sender,
|
||||
duplicate_confirmed_slot_sender,
|
||||
latest_vote_slot_per_validator,
|
||||
);
|
||||
}
|
||||
gossip_vote_txn_processing_time.stop();
|
||||
|
@ -875,6 +898,7 @@ mod tests {
|
|||
use {
|
||||
super::*,
|
||||
crate::banking_trace::BankingTracer,
|
||||
itertools::Itertools,
|
||||
solana_perf::packet,
|
||||
solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
|
||||
solana_runtime::{
|
||||
|
@ -890,7 +914,10 @@ mod tests {
|
|||
signature::{Keypair, Signature, Signer},
|
||||
},
|
||||
solana_vote::vote_sender_types::ReplayVoteSender,
|
||||
solana_vote_program::{vote_state::Vote, vote_transaction},
|
||||
solana_vote_program::{
|
||||
vote_state::{Vote, VoteStateUpdate},
|
||||
vote_transaction,
|
||||
},
|
||||
std::{
|
||||
collections::BTreeSet,
|
||||
iter::repeat_with,
|
||||
|
@ -987,6 +1014,7 @@ mod tests {
|
|||
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
|
||||
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
|
||||
let (replay_votes_sender, replay_votes_receiver) = unbounded();
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
|
||||
let GenesisConfigInfo { genesis_config, .. } =
|
||||
genesis_utils::create_genesis_config_with_vote_accounts(
|
||||
|
@ -1022,6 +1050,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -1054,6 +1083,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -1105,6 +1135,7 @@ mod tests {
|
|||
let (replay_votes_sender, replay_votes_receiver) = unbounded();
|
||||
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
|
||||
let (verified_vote_sender, verified_vote_receiver) = unbounded();
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
|
||||
let GenesisConfigInfo { genesis_config, .. } =
|
||||
genesis_utils::create_genesis_config_with_vote_accounts(
|
||||
|
@ -1137,6 +1168,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -1255,6 +1287,7 @@ mod tests {
|
|||
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
|
||||
let (verified_vote_sender, verified_vote_receiver) = unbounded();
|
||||
let (_replay_votes_sender, replay_votes_receiver) = unbounded();
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
|
||||
let mut expected_votes = vec![];
|
||||
let num_voters_per_slot = 2;
|
||||
|
@ -1295,6 +1328,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -1340,6 +1374,7 @@ mod tests {
|
|||
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
|
||||
let (replay_votes_sender, replay_votes_receiver): (ReplayVoteSender, ReplayVoteReceiver) =
|
||||
unbounded();
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
|
||||
let vote_slot = 1;
|
||||
let vote_bank_hash = Hash::default();
|
||||
|
@ -1396,6 +1431,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
);
|
||||
}
|
||||
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
|
||||
|
@ -1454,6 +1490,7 @@ mod tests {
|
|||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||
optimistically_confirmed_bank,
|
||||
));
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
|
||||
// Send a vote to process, should add a reference to the pubkey for that voter
|
||||
// in the tracker
|
||||
|
@ -1489,6 +1526,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
);
|
||||
|
||||
// Setup next epoch
|
||||
|
@ -1536,6 +1574,7 @@ mod tests {
|
|||
&None,
|
||||
&None,
|
||||
&mut None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -1761,4 +1800,103 @@ mod tests {
|
|||
.previously_sent_to_bank_votes
|
||||
.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_track_new_votes_filter() {
|
||||
let validator_keypairs: Vec<_> =
|
||||
(0..2).map(|_| ValidatorVoteKeypairs::new_rand()).collect();
|
||||
|
||||
let GenesisConfigInfo { genesis_config, .. } =
|
||||
genesis_utils::create_genesis_config_with_vote_accounts(
|
||||
10_000,
|
||||
&validator_keypairs,
|
||||
vec![100; validator_keypairs.len()],
|
||||
);
|
||||
let bank = Bank::new_for_tests(&genesis_config);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let bank_forks = BankForks::new_rw_arc(bank);
|
||||
let bank = bank_forks.read().unwrap().get(0).unwrap();
|
||||
let vote_tracker = VoteTracker::default();
|
||||
let optimistically_confirmed_bank =
|
||||
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
|
||||
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
|
||||
let max_complete_rewards_slot = Arc::new(AtomicU64::default());
|
||||
let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
|
||||
exit,
|
||||
max_complete_transaction_status_slot,
|
||||
max_complete_rewards_slot,
|
||||
bank_forks,
|
||||
Arc::new(RwLock::new(BlockCommitmentCache::default())),
|
||||
optimistically_confirmed_bank,
|
||||
));
|
||||
let mut latest_vote_slot_per_validator = HashMap::new();
|
||||
|
||||
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
|
||||
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
|
||||
let mut diff = HashMap::default();
|
||||
let mut new_optimistic_confirmed_slots = vec![];
|
||||
|
||||
let validator0_keypairs = &validator_keypairs[0];
|
||||
let (vote_pubkey, vote, _, signature) = vote_parser::parse_vote_transaction(
|
||||
&vote_transaction::new_vote_state_update_transaction(
|
||||
VoteStateUpdate::from(vec![(1, 3), (2, 2), (6, 1)]),
|
||||
Hash::default(),
|
||||
&validator0_keypairs.node_keypair,
|
||||
&validator0_keypairs.vote_keypair,
|
||||
&validator0_keypairs.vote_keypair,
|
||||
None,
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
ClusterInfoVoteListener::track_new_votes_and_notify_confirmations(
|
||||
vote,
|
||||
&vote_pubkey,
|
||||
signature,
|
||||
&vote_tracker,
|
||||
&bank,
|
||||
&subscriptions,
|
||||
&verified_vote_sender,
|
||||
&gossip_verified_vote_hash_sender,
|
||||
&mut diff,
|
||||
&mut new_optimistic_confirmed_slots,
|
||||
true, /* is gossip */
|
||||
&None,
|
||||
&None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
);
|
||||
assert_eq!(diff.keys().copied().sorted().collect_vec(), vec![1, 2, 6]);
|
||||
|
||||
// Vote on a new slot, only those later than 6 should show up. 4 is skipped.
|
||||
diff.clear();
|
||||
let (vote_pubkey, vote, _, signature) = vote_parser::parse_vote_transaction(
|
||||
&vote_transaction::new_vote_state_update_transaction(
|
||||
VoteStateUpdate::from(vec![(1, 6), (2, 5), (3, 4), (4, 3), (7, 2), (8, 1)]),
|
||||
Hash::default(),
|
||||
&validator0_keypairs.node_keypair,
|
||||
&validator0_keypairs.vote_keypair,
|
||||
&validator0_keypairs.vote_keypair,
|
||||
None,
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
ClusterInfoVoteListener::track_new_votes_and_notify_confirmations(
|
||||
vote,
|
||||
&vote_pubkey,
|
||||
signature,
|
||||
&vote_tracker,
|
||||
&bank,
|
||||
&subscriptions,
|
||||
&verified_vote_sender,
|
||||
&gossip_verified_vote_hash_sender,
|
||||
&mut diff,
|
||||
&mut new_optimistic_confirmed_slots,
|
||||
true, /* is gossip */
|
||||
&None,
|
||||
&None,
|
||||
&mut latest_vote_slot_per_validator,
|
||||
);
|
||||
assert_eq!(diff.keys().copied().sorted().collect_vec(), vec![7, 8]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue