removes Select in favor of recv_timeout/try_iter (#21981)
crossbeam_channel::Select::ready_timeout might return with success spuriously.
This commit is contained in:
parent
3fe942ab30
commit
7476dfeec0
|
@ -97,11 +97,8 @@ impl AggregateCommitmentService {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
|
||||
while let Ok(new_data) = receiver.try_recv() {
|
||||
aggregation_data = new_data;
|
||||
}
|
||||
let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data);
|
||||
|
||||
let ancestors = aggregation_data.bank.status_cache_ancestors();
|
||||
if ancestors.is_empty() {
|
||||
|
|
|
@ -164,12 +164,9 @@ impl LedgerCleanupService {
|
|||
}
|
||||
|
||||
fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
|
||||
let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
// Get the newest root
|
||||
while let Ok(new_root) = new_root_receiver.try_recv() {
|
||||
root = new_root;
|
||||
}
|
||||
Ok(root)
|
||||
Ok(new_root_receiver.try_iter().last().unwrap_or(root))
|
||||
}
|
||||
|
||||
pub fn cleanup_ledger(
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use {
|
||||
crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
|
||||
crossbeam_channel::Select,
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
|
@ -141,10 +140,10 @@ impl VerifiedVotePackets {
|
|||
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
|
||||
would_be_leader: bool,
|
||||
) -> Result<()> {
|
||||
let mut sel = Select::new();
|
||||
sel.recv(vote_packets_receiver);
|
||||
let _ = sel.ready_timeout(Duration::from_millis(200))?;
|
||||
for gossip_votes in vote_packets_receiver.try_iter() {
|
||||
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
|
||||
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
|
||||
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
|
||||
for gossip_votes in vote_packets {
|
||||
if would_be_leader {
|
||||
for verfied_vote_metadata in gossip_votes {
|
||||
let VerifiedVoteMetadata {
|
||||
|
@ -284,7 +283,7 @@ mod tests {
|
|||
// No new messages, should time out
|
||||
assert_matches!(
|
||||
verified_vote_packets.receive_and_process_vote_packets(&r, true),
|
||||
Err(Error::ReadyTimeout)
|
||||
Err(Error::CrossbeamRecvTimeout(_))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -233,14 +233,10 @@ fn run_check_duplicate(
|
|||
|
||||
Ok(())
|
||||
};
|
||||
let timer = Duration::from_millis(200);
|
||||
let shred = shred_receiver.recv_timeout(timer)?;
|
||||
check_duplicate(shred)?;
|
||||
while let Ok(shred) = shred_receiver.try_recv() {
|
||||
check_duplicate(shred)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
|
||||
std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
|
||||
.chain(shred_receiver.try_iter())
|
||||
.try_for_each(check_duplicate)
|
||||
}
|
||||
|
||||
fn verify_repair(
|
||||
|
|
Loading…
Reference in New Issue