Fix synchronization delay issue (#2921)
* Create a `NowOrLater` helper type A replacement for `FutureExt::now_or_never` that ensures that the task is scheduled for waking up later when the inner future is ready. * Use `NowOrLater` to fix possible delay bug Previous usage of `now_or_never` meant that the underlying task wasn't being scheduled to awake when the `Downloads` stream produced a new item. Using `NowOrLater` instead fixes that issue.
This commit is contained in:
parent
ffa6fed7b3
commit
595d75d5fb
|
@ -0,0 +1,5 @@
|
|||
//! Extensions used in [`Future`]s and async code.
|
||||
|
||||
mod now_or_later;
|
||||
|
||||
pub use self::now_or_later::NowOrLater;
|
|
@ -0,0 +1,63 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use pin_project::pin_project;
|
||||
|
||||
/// A helper [`Future`] wrapper that will always return [`Poll::Ready`].
|
||||
///
|
||||
/// If the inner [`Future`] `F` is ready and produces an output `value`, then [`NowOrNever`] will
|
||||
/// also be ready but with an output `Some(value)`.
|
||||
///
|
||||
/// If the inner [`Future`] `F` is not ready, then:
|
||||
///
|
||||
/// - [`NowOrNever`] will be still be ready but with an output `None`,
|
||||
/// - and the task associated with the future will be scheduled to awake whenever the inner
|
||||
/// [`Future`] `F` becomes ready.
|
||||
///
|
||||
/// This is different from [`FutureExt::now_or_never`] because `now_or_never` uses a fake task
|
||||
/// [`Context`], which means that calling `now_or_never` inside an `async` function doesn't
|
||||
/// schedule the generated future to be polled again when the inner future becomes ready.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use futures::{FutureExt, future};
|
||||
/// # use zebrad::async_ext::NowOrLater;
|
||||
///
|
||||
/// let inner_future = future::ready(());
|
||||
///
|
||||
/// # let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
|
||||
/// #
|
||||
/// # runtime.block_on(async move {
|
||||
/// assert_eq!(NowOrLater(inner_future).await, Some(()));
|
||||
/// # });
|
||||
/// ```
|
||||
///
|
||||
/// ```
|
||||
/// use futures::{FutureExt, future};
|
||||
/// # use zebrad::async_ext::NowOrLater;
|
||||
///
|
||||
/// let inner_future = future::pending::<()>();
|
||||
///
|
||||
/// # let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
|
||||
/// #
|
||||
/// # runtime.block_on(async move {
|
||||
/// assert_eq!(NowOrLater(inner_future).await, None);
|
||||
/// # });
|
||||
/// ```
|
||||
#[pin_project]
|
||||
pub struct NowOrLater<F>(#[pin] pub F);
|
||||
|
||||
impl<F: Future> Future for NowOrLater<F> {
|
||||
type Output = Option<F::Output>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.project().0.poll(context) {
|
||||
Poll::Ready(value) => Poll::Ready(Some(value)),
|
||||
Poll::Pending => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,10 +1,7 @@
|
|||
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
|
||||
|
||||
use color_eyre::eyre::{eyre, Report};
|
||||
use futures::{
|
||||
future::FutureExt,
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tokio::time::sleep;
|
||||
use tower::{
|
||||
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
|
||||
|
@ -22,7 +19,8 @@ use zebra_network as zn;
|
|||
use zebra_state as zs;
|
||||
|
||||
use crate::{
|
||||
components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
|
||||
async_ext::NowOrLater, components::sync::downloads::BlockDownloadVerifyError,
|
||||
config::ZebradConfig, BoxError,
|
||||
};
|
||||
|
||||
mod downloads;
|
||||
|
@ -314,7 +312,7 @@ where
|
|||
|
||||
while !self.prospective_tips.is_empty() {
|
||||
// Check whether any block tasks are currently ready:
|
||||
while let Some(Some(rsp)) = self.downloads.next().now_or_never() {
|
||||
while let Some(Some(rsp)) = NowOrLater(self.downloads.next()).await {
|
||||
match rsp {
|
||||
Ok(hash) => {
|
||||
tracing::trace!(?hash, "verified and committed block to state");
|
||||
|
|
|
@ -39,6 +39,7 @@ pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
|||
mod components;
|
||||
|
||||
pub mod application;
|
||||
pub mod async_ext;
|
||||
pub mod commands;
|
||||
pub mod config;
|
||||
pub mod prelude;
|
||||
|
|
Loading…
Reference in New Issue