diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 6fc58e3cb..e56ae7727 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -123,10 +123,10 @@ where // submitted, so that the batch latency of all entries is at most // self.max_latency. However, we don't keep the timer running unless // there is a pending request to prevent wakeups on idle services. - let mut timer: Option = None; + let mut timer: Option>> = None; let mut pending_items = 0usize; loop { - match timer { + match timer.as_mut() { None => match self.rx.next().await { // The first message in a new batch. Some(msg) => { @@ -135,13 +135,13 @@ where // Apply the provided span to request processing .instrument(span) .await; - timer = Some(sleep(self.max_latency)); + timer = Some(Box::pin(sleep(self.max_latency))); pending_items = 1; } // No more messages, ever. None => return, }, - Some(mut sleep) => { + Some(sleep) => { // Wait on either a new message or the batch timer. // If both are ready, select! chooses one of them at random. tokio::select! { @@ -161,8 +161,7 @@ where timer = None; pending_items = 0; } else { - // The timer is still running, set it back! - timer = Some(sleep); + // The timer is still running. } } None => { @@ -170,7 +169,7 @@ where return; } }, - () = &mut sleep => { + () = sleep => { // The batch timer elapsed. // XXX(hdevalence): what span should instrument this? self.flush_service().await; diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index b6eae8b4f..4319e8ee2 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -7,7 +7,7 @@ //! And it's unclear if these assumptions match the `zcashd` implementation. //! It should be refactored into a cleaner set of request/response pairs (#1515). -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, pin::Pin, sync::Arc}; use futures::{ future::{self, Either}, @@ -320,7 +320,7 @@ pub struct Connection { /// A timeout for a client request. This is stored separately from /// State so that we can move the future out of it independently of /// other state handling. - pub(super) request_timer: Option, + pub(super) request_timer: Option>>, /// The `inbound` service, used to answer requests from this connection's peer. pub(super) svc: S, @@ -780,11 +780,11 @@ where // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); self.state = AwaitingRequest; - self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); + self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT))); } Ok((new_state @ AwaitingResponse { .. }, None)) => { self.state = new_state; - self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); + self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT))); } Err((e, tx)) => { let e = SharedPeerError::from(e);