banking_stage: track VoteStorage size instead of costly iterator (#743)

* banking_stage: track VoteStorage size instead of costly iterator

* missed one

---------

Co-authored-by: steviez <steven@anza.xyz>
This commit is contained in:
Ashwin Sekar 2024-04-11 07:16:33 -07:00 committed by GitHub
parent 4e2fc2772b
commit 4fc6b0bd96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 17 additions and 11 deletions

View File

@ -16,7 +16,10 @@ use {
std::{
collections::HashMap,
ops::DerefMut,
sync::{Arc, RwLock},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
},
};
@ -144,6 +147,7 @@ pub(crate) struct VoteBatchInsertionMetrics {
#[derive(Debug, Default)]
pub struct LatestUnprocessedVotes {
latest_votes_per_pubkey: RwLock<HashMap<Pubkey, Arc<RwLock<LatestValidatorVotePacket>>>>,
num_unprocessed_votes: AtomicUsize,
}
impl LatestUnprocessedVotes {
@ -151,14 +155,8 @@ impl LatestUnprocessedVotes {
Self::default()
}
/// Expensive because this involves iterating through and locking every unprocessed vote
pub fn len(&self) -> usize {
self.latest_votes_per_pubkey
.read()
.unwrap()
.values()
.filter(|lock| !lock.read().unwrap().is_vote_taken())
.count()
self.num_unprocessed_votes.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
@ -220,6 +218,7 @@ impl LatestUnprocessedVotes {
if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) {
let old_vote = std::mem::replace(latest_vote.deref_mut(), vote);
if old_vote.is_vote_taken() {
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
return None;
} else {
return Some(old_vote);
@ -233,6 +232,7 @@ impl LatestUnprocessedVotes {
// and when a new vote account starts voting.
let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap();
latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote)));
self.num_unprocessed_votes.fetch_add(1, Ordering::Relaxed);
None
}
@ -319,7 +319,10 @@ impl LatestUnprocessedVotes {
.filter_map(|pubkey| {
self.get_entry(pubkey).and_then(|lock| {
let mut latest_vote = lock.write().unwrap();
latest_vote.take_vote()
latest_vote.take_vote().map(|vote| {
self.num_unprocessed_votes.fetch_sub(1, Ordering::Relaxed);
vote
})
})
})
.collect_vec()
@ -335,8 +338,8 @@ impl LatestUnprocessedVotes {
.filter(|lock| lock.read().unwrap().is_forwarded())
.for_each(|lock| {
let mut vote = lock.write().unwrap();
if vote.is_forwarded() {
vote.take_vote();
if vote.is_forwarded() && vote.take_vote().is_some() {
self.num_unprocessed_votes.fetch_sub(1, Ordering::Relaxed);
}
});
}
@ -726,6 +729,9 @@ mod tests {
let mut latest_vote = lock.write().unwrap();
if !latest_vote.is_vote_taken() {
latest_vote.take_vote();
latest_unprocessed_votes_tpu
.num_unprocessed_votes
.fetch_sub(1, Ordering::Relaxed);
}
});
}