bank_send_loop: Reduce feature flag polling frequency

This commit is contained in:
Trent Nelson 2023-06-12 22:46:20 -06:00 committed by Trent Nelson
parent dd379bfad8
commit 077e29aa1e
2 changed files with 43 additions and 52 deletions

View File

@ -36,6 +36,7 @@ use {
},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT},
feature_set::allow_votes_to_directly_update_vote_state,
hash::Hash,
pubkey::Pubkey,
signature::Signature,
@ -386,6 +387,12 @@ impl ClusterInfoVoteListener {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
let mut bank_vote_sender_state_option: Option<BankVoteSenderState> = None;
let mut is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());
loop {
if exit.load(Ordering::Relaxed) {
@ -396,12 +403,11 @@ impl ClusterInfoVoteListener {
.read()
.unwrap()
.would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT);
let feature_set = Some(bank_forks.read().unwrap().root_bank().feature_set.clone());
if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
&verified_vote_label_packets_receiver,
would_be_leader,
feature_set,
is_tower_full_vote_enabled,
) {
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected)
@ -425,6 +431,15 @@ impl ClusterInfoVoteListener {
&verified_vote_packets,
)?;
}
// Check if we've crossed the feature boundary
if !is_tower_full_vote_enabled {
is_tower_full_vote_enabled = bank_forks
.read()
.unwrap()
.root_bank()
.feature_set
.is_active(&allow_votes_to_directly_update_vote_state::id());
}
}
}
}

View File

@ -9,7 +9,6 @@ use {
solana_sdk::{
account::from_account,
clock::{Slot, UnixTimestamp},
feature_set::{allow_votes_to_directly_update_vote_state, FeatureSet},
hash::Hash,
pubkey::Pubkey,
signature::Signature,
@ -211,17 +210,12 @@ impl VerifiedVotePackets {
&mut self,
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
would_be_leader: bool,
feature_set: Option<Arc<FeatureSet>>,
is_full_tower_vote_enabled: bool,
) -> Result<()> {
use SingleValidatorVotes::*;
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
let mut is_full_tower_vote_enabled = false;
if let Some(feature_set) = feature_set {
is_full_tower_vote_enabled =
feature_set.is_active(&allow_votes_to_directly_update_vote_state::id());
}
for gossip_votes in vote_packets {
if would_be_leader {
@ -348,7 +342,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
@ -368,7 +362,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
@ -390,7 +384,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
@ -413,7 +407,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
@ -426,7 +420,7 @@ mod tests {
// No new messages, should time out
assert_matches!(
verified_vote_packets.receive_and_process_vote_packets(&r, true, None),
verified_vote_packets.receive_and_process_vote_packets(&r, true, false),
Err(Error::RecvTimeout(_))
);
}
@ -455,7 +449,7 @@ mod tests {
// At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
verified_vote_packets
@ -493,7 +487,7 @@ mod tests {
// Ingest the votes into the buffer
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
// Create tracker for previously sent bank votes
@ -548,7 +542,7 @@ mod tests {
// Ingest the votes into the buffer
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
// One batch of vote packets per validator
@ -608,7 +602,7 @@ mod tests {
.unwrap();
// Ingest the votes into the buffer
verified_vote_packets
.receive_and_process_vote_packets(&r, true, None)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new(
my_leader_bank,
@ -640,11 +634,8 @@ mod tests {
}
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
let feature_set = Some(Arc::new(feature_set));
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set.clone())
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
// second_vote should be kept and first_vote ignored
@ -665,7 +656,7 @@ mod tests {
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set)
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
let slot = verified_vote_packets
.0
@ -685,9 +676,8 @@ mod tests {
.unwrap();
}
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let feature_set = FeatureSet::default();
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
@ -705,7 +695,7 @@ mod tests {
r: &Receiver<Vec<VerifiedVoteMetadata>>,
vote: VoteStateUpdate,
vote_account_key: Pubkey,
feature_set: Option<Arc<FeatureSet>>,
is_tower_full_vote_enabled: bool,
verified_vote_packets: &mut VerifiedVotePackets,
) -> GossipVote {
s.send(vec![VerifiedVoteMetadata {
@ -716,7 +706,7 @@ mod tests {
}])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(r, true, feature_set)
.receive_and_process_vote_packets(r, true, is_tower_full_vote_enabled)
.unwrap();
match verified_vote_packets.0.get(&vote_account_key).unwrap() {
SingleValidatorVotes::FullTowerVote(gossip_vote) => gossip_vote.clone(),
@ -743,9 +733,6 @@ mod tests {
vote_no_ts.timestamp = None;
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
let feature_set = Some(Arc::new(feature_set));
// Original vote
let GossipVote {
@ -755,7 +742,7 @@ mod tests {
&r,
vote.clone(),
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote.last_voted_slot().unwrap());
@ -769,7 +756,7 @@ mod tests {
&r,
vote_later_ts.clone(),
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
@ -783,7 +770,7 @@ mod tests {
&r,
vote_earlier_ts,
vote_account_key,
feature_set.clone(),
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
@ -797,7 +784,7 @@ mod tests {
&r,
vote_no_ts,
vote_account_key,
feature_set,
true,
&mut verified_vote_packets,
);
assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap());
@ -823,9 +810,8 @@ mod tests {
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
// Receive votes without the feature active
let feature_set = Some(Arc::new(FeatureSet::default()));
verified_vote_packets
.receive_and_process_vote_packets(&r, true, feature_set)
.receive_and_process_vote_packets(&r, true, false)
.unwrap();
assert_eq!(
100,
@ -858,10 +844,8 @@ mod tests {
}
// Receive votes with the feature active
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert_eq!(200, vote.slot);
@ -887,10 +871,8 @@ mod tests {
.unwrap();
// Receive incremental votes with the feature active
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
// Should still store as incremental votes
@ -918,10 +900,8 @@ mod tests {
.unwrap();
// Receive full votes
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
assert_eq!(
42,
@ -932,7 +912,7 @@ mod tests {
.get_latest_gossip_slot()
);
// Try to send an old ibncremental vote from pre feature activation
// Try to send an old incremental vote from pre feature activation
let vote = VoteTransaction::from(Vote::new(vec![34], Hash::new_unique()));
s.send(vec![VerifiedVoteMetadata {
vote_account_key,
@ -943,10 +923,8 @@ mod tests {
.unwrap();
// Try to receive nothing should happen
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert_eq!(42, vote.slot);
@ -966,10 +944,8 @@ mod tests {
.unwrap();
// Try to receive and vote lands as well as the conversion back to incremental votes
let mut feature_set = FeatureSet::default();
feature_set.activate(&allow_votes_to_directly_update_vote_state::id(), 0);
verified_vote_packets
.receive_and_process_vote_packets(&r, true, Some(Arc::new(feature_set)))
.receive_and_process_vote_packets(&r, true, true)
.unwrap();
if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() {
assert!(votes.contains_key(&(42, hash_42)));