diff --git a/tower-batch/src/future.rs b/tower-batch/src/future.rs index ed96ce3fc..2d7107f70 100644 --- a/tower-batch/src/future.rs +++ b/tower-batch/src/future.rs @@ -49,6 +49,13 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // This loop ensures that the task is scheduled as required, because it + // only returns Pending when another future returns Pending. loop { match this.state.as_mut().project() { ResponseStateProj::Failed(e) => { diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index 7f99aa195..2cc7b3740 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -134,12 +134,19 @@ where return Poll::Ready(Err(self.get_worker_error())); } - // Then, poll to acquire a semaphore permit. If we acquire a permit, - // then there's enough buffer capacity to send a new request. Otherwise, - // we need to wait for capacity. - - // In tokio 0.3.7, `acquire_owned` panics if its semaphore returns an error, - // so we don't need to handle errors until we upgrade to tokio 1.0. + // CORRECTNESS + // + // Poll to acquire a semaphore permit. If we acquire a permit, then + // there's enough buffer capacity to send a new request. Otherwise, we + // need to wait for capacity. + // + // In tokio 0.3.7, `acquire_owned` panics if its semaphore returns an + // error, so we don't need to handle errors until we upgrade to + // tokio 1.0. + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. If it returns Pending, the semaphore also schedules + // the task for wakeup when the next permit is available. ready!(self.semaphore.poll_acquire(cx)); Poll::Ready(Ok(())) diff --git a/tower-fallback/src/future.rs b/tower-fallback/src/future.rs index 59019acf0..92dd9ea9a 100644 --- a/tower-fallback/src/future.rs +++ b/tower-fallback/src/future.rs @@ -74,6 +74,13 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // This loop ensures that the task is scheduled as required, because it + // only returns Pending when a future or service returns Pending. loop { match this.state.as_mut().project() { ResponseStateProj::PollResponse1 { fut, .. } => match ready!(fut.poll(cx)) { diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 3f4160ac5..7303aaf53 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -71,7 +71,13 @@ where Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // Correctness: + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // If either verifier is unready, this task is scheduled for wakeup when it becomes + // ready. // // We acquire checkpoint readiness before block readiness, to avoid an unlikely // hang during the checkpoint to block verifier transition. If the checkpoint and diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 264fa26c6..bf26a2b5a 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -113,7 +113,13 @@ impl Stream for ClientRequestReceiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.inner.poll_next_unpin(cx) { Poll::Ready(client_request) => Poll::Ready(client_request.map(Into::into)), - // `inner.poll_next_unpin` parks the task for this future + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we + // return `Poll::Pending`. + // + // inner.poll_next_unpin` schedules this task for wakeup when + // there are new items available in the inner stream. Poll::Pending => Poll::Pending, } } @@ -198,6 +204,19 @@ impl Service for Client { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + //`ready!` returns `Poll::Pending` when `server_tx` is unready, and + // schedules this task for wakeup. + // + // Since `shutdown_tx` is used for oneshot communication to the heartbeat + // task, it will never be `Pending`. + // + // TODO: should the Client exit if the heartbeat task exits and drops + // `shutdown_tx`? if ready!(self.server_tx.poll_ready(cx)).is_err() { Poll::Ready(Err(self .error_slot diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index d87016166..95d2b57cc 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -464,6 +464,25 @@ where if self.preselected_p2c_index.is_none() { trace!("no ready services, sending demand signal"); let _ = self.demand_signal.try_send(()); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we + // return `Poll::Pending`. + // + // As long as there are unready or new peers, this task will run, + // because: + // - `poll_discover` schedules this task for wakeup when new + // peers arrive. + // - if there are unready peers, `poll_unready` schedules this + // task for wakeup when peer services become ready. + // - if the preselected peer is not ready, `service.poll_ready` + // schedules this task for wakeup when that service becomes + // ready. + // + // To avoid peers blocking on a full background error channel: + // - if no background tasks have exited since the last poll, + // `poll_background_errors` schedules this task for wakeup when + // the next task exits. return Poll::Pending; } } diff --git a/zebra-network/src/peer_set/unready_service.rs b/zebra-network/src/peer_set/unready_service.rs index 9323de307..602198040 100644 --- a/zebra-network/src/peer_set/unready_service.rs +++ b/zebra-network/src/peer_set/unready_service.rs @@ -40,6 +40,15 @@ impl, Req> Future for UnreadyService { return Poll::Ready(Err((key, Error::Canceled))); } + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + //`ready!` returns `Poll::Pending` when the service is unready, and + // schedules this task for wakeup. + // + // `cancel.poll` also schedules this task for wakeup if it is canceled. let res = ready!(this .service .as_mut() diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 54e2bc901..2ce522aa1 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -108,6 +108,15 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // If no download and verify tasks have exited since the last poll, this + // task is scheduled for wakeup when the next task becomes ready. + // + // TODO: // This would be cleaner with poll_map #63514, but that's nightly only. if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") { diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 01f818177..11f7ab6dd 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -72,6 +72,15 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); + // CORRECTNESS + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // If no download and verify tasks have exited since the last poll, this + // task is scheduled for wakeup when the next task becomes ready. + // + // TODO: // This would be cleaner with poll_map #63514, but that's nightly only. if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") {