//! Future types for the `Batch` middleware. use super::{error::Closed, message}; use futures_core::ready; use pin_project::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; /// Future that completes when the batch processing is complete. #[pin_project] #[derive(Debug)] pub struct ResponseFuture { #[pin] state: ResponseState, } #[pin_project(project = ResponseStateProj)] #[derive(Debug)] enum ResponseState { Failed(Option), Rx(#[pin] message::Rx), Poll(#[pin] T), } impl ResponseFuture { pub(crate) fn new(rx: message::Rx) -> Self { ResponseFuture { state: ResponseState::Rx(rx), } } pub(crate) fn failed(err: crate::BoxError) -> Self { ResponseFuture { state: ResponseState::Failed(Some(err)), } } } impl Future for ResponseFuture where F: Future>, E: Into, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); // CORRECTNESS // // The current task must be scheduled for wakeup every time we return // `Poll::Pending`. // // This loop ensures that the task is scheduled as required, because it // only returns Pending when another future returns Pending. loop { match this.state.as_mut().project() { ResponseStateProj::Failed(e) => { return Poll::Ready(Err(e.take().expect("polled after error"))); } ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) { Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)), Ok(Err(e)) => return Poll::Ready(Err(e.into())), Err(_) => return Poll::Ready(Err(Closed::new().into())), }, ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into), } } } }