Manually pin `Sleep` futures (#2914)
* Wrap `Sleep` timer in a `Pin<Box<_>>` The `Sleep` type doesn't implement `Unpin` in newer versions of Tokio. * Wrap `Sleep` type in a `Pin<Box<_>>` In newer Tokio versions the `Sleep` type doesn't implement `Unpin`, so it needs to be manually pinned.
This commit is contained in:
parent
ae6c90f914
commit
2a1d4281c5
|
@ -123,10 +123,10 @@ where
|
||||||
// submitted, so that the batch latency of all entries is at most
|
// submitted, so that the batch latency of all entries is at most
|
||||||
// self.max_latency. However, we don't keep the timer running unless
|
// self.max_latency. However, we don't keep the timer running unless
|
||||||
// there is a pending request to prevent wakeups on idle services.
|
// there is a pending request to prevent wakeups on idle services.
|
||||||
let mut timer: Option<Sleep> = None;
|
let mut timer: Option<Pin<Box<Sleep>>> = None;
|
||||||
let mut pending_items = 0usize;
|
let mut pending_items = 0usize;
|
||||||
loop {
|
loop {
|
||||||
match timer {
|
match timer.as_mut() {
|
||||||
None => match self.rx.next().await {
|
None => match self.rx.next().await {
|
||||||
// The first message in a new batch.
|
// The first message in a new batch.
|
||||||
Some(msg) => {
|
Some(msg) => {
|
||||||
|
@ -135,13 +135,13 @@ where
|
||||||
// Apply the provided span to request processing
|
// Apply the provided span to request processing
|
||||||
.instrument(span)
|
.instrument(span)
|
||||||
.await;
|
.await;
|
||||||
timer = Some(sleep(self.max_latency));
|
timer = Some(Box::pin(sleep(self.max_latency)));
|
||||||
pending_items = 1;
|
pending_items = 1;
|
||||||
}
|
}
|
||||||
// No more messages, ever.
|
// No more messages, ever.
|
||||||
None => return,
|
None => return,
|
||||||
},
|
},
|
||||||
Some(mut sleep) => {
|
Some(sleep) => {
|
||||||
// Wait on either a new message or the batch timer.
|
// Wait on either a new message or the batch timer.
|
||||||
// If both are ready, select! chooses one of them at random.
|
// If both are ready, select! chooses one of them at random.
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -161,8 +161,7 @@ where
|
||||||
timer = None;
|
timer = None;
|
||||||
pending_items = 0;
|
pending_items = 0;
|
||||||
} else {
|
} else {
|
||||||
// The timer is still running, set it back!
|
// The timer is still running.
|
||||||
timer = Some(sleep);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
@ -170,7 +169,7 @@ where
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
() = &mut sleep => {
|
() = sleep => {
|
||||||
// The batch timer elapsed.
|
// The batch timer elapsed.
|
||||||
// XXX(hdevalence): what span should instrument this?
|
// XXX(hdevalence): what span should instrument this?
|
||||||
self.flush_service().await;
|
self.flush_service().await;
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
//! And it's unclear if these assumptions match the `zcashd` implementation.
|
//! And it's unclear if these assumptions match the `zcashd` implementation.
|
||||||
//! It should be refactored into a cleaner set of request/response pairs (#1515).
|
//! It should be refactored into a cleaner set of request/response pairs (#1515).
|
||||||
|
|
||||||
use std::{collections::HashSet, sync::Arc};
|
use std::{collections::HashSet, pin::Pin, sync::Arc};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
future::{self, Either},
|
future::{self, Either},
|
||||||
|
@ -320,7 +320,7 @@ pub struct Connection<S, Tx> {
|
||||||
/// A timeout for a client request. This is stored separately from
|
/// A timeout for a client request. This is stored separately from
|
||||||
/// State so that we can move the future out of it independently of
|
/// State so that we can move the future out of it independently of
|
||||||
/// other state handling.
|
/// other state handling.
|
||||||
pub(super) request_timer: Option<Sleep>,
|
pub(super) request_timer: Option<Pin<Box<Sleep>>>,
|
||||||
|
|
||||||
/// The `inbound` service, used to answer requests from this connection's peer.
|
/// The `inbound` service, used to answer requests from this connection's peer.
|
||||||
pub(super) svc: S,
|
pub(super) svc: S,
|
||||||
|
@ -780,11 +780,11 @@ where
|
||||||
// send a response before dropping tx.
|
// send a response before dropping tx.
|
||||||
let _ = tx.send(Ok(Response::Nil));
|
let _ = tx.send(Ok(Response::Nil));
|
||||||
self.state = AwaitingRequest;
|
self.state = AwaitingRequest;
|
||||||
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
|
||||||
}
|
}
|
||||||
Ok((new_state @ AwaitingResponse { .. }, None)) => {
|
Ok((new_state @ AwaitingResponse { .. }, None)) => {
|
||||||
self.state = new_state;
|
self.state = new_state;
|
||||||
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
|
||||||
}
|
}
|
||||||
Err((e, tx)) => {
|
Err((e, tx)) => {
|
||||||
let e = SharedPeerError::from(e);
|
let e = SharedPeerError::from(e);
|
||||||
|
|
Loading…
Reference in New Issue