Set fetch stage socket non blocking to false while during recv (#2542)

* Set fetch stage socket non blocking to false while during recv

* remove ProgramError changes from this PR
This commit is contained in:
Pankaj Garg 2019-01-24 12:46:40 -08:00 committed by GitHub
parent 9abc500269
commit a9b083e585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 5 deletions

View File

@ -654,7 +654,8 @@ impl Bank {
}
Ok(())
}
pub fn par_execute_entries(&self, entries: &[(&Entry, Vec<Result<()>>)]) -> Result<()> {
fn par_execute_entries(&self, entries: &[(&Entry, Vec<Result<()>>)]) -> Result<()> {
inc_new_counter_info!("bank-par_execute_entries-count", entries.len());
let results: Vec<Result<()>> = entries
.into_par_iter()

View File

@ -19,7 +19,7 @@ impl FetchStage {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
Self::new_multi_socket(tx_sockets, exit)
}
pub fn new_multi_socket(
fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
exit: Arc<AtomicBool>,
) -> (Self, PacketReceiver) {

View File

@ -182,7 +182,6 @@ impl Packets {
Err(_) if i > 0 => {
inc_new_counter_info!("packets-recv_count", i);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
socket.set_nonblocking(true)?;
return Ok(i);
}
Err(e) => {
@ -190,10 +189,12 @@ impl Packets {
return Err(Error::IO(e));
}
Ok(npkts) => {
if i == 0 {
socket.set_nonblocking(true)?;
}
trace!("got {} packets", npkts);
i += npkts;
if npkts != NUM_RCVMMSGS {
socket.set_nonblocking(true)?;
if npkts != NUM_RCVMMSGS || i >= 1024 {
inc_new_counter_info!("packets-recv_count", i);
return Ok(i);
}