diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 5310126aa..e99d628af 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -530,6 +530,13 @@ impl Service for StateService { type Future = Pin> + Send + 'static>>; + // ## Correctness: + // + // This function must not return Poll::Pending, unless: + // 1. We remove all instances of `call_all` on the state service, or fix the leaked + // service reservation in the `CallAll` implementation: + // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 + // 2. We schedule the current task for wakeup via the `Context` fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { let now = Instant::now(); diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 3fdb6126b..7041d66bc 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -52,6 +52,12 @@ pub type SetupData = (Outbound, Arc>); /// behind the current tip, while the `Inbound` service is *externally driven*, /// responding to block gossip by attempting to download and validate advertised /// blocks. +/// +/// ## Correctness +/// +/// The `state` service must not return `Poll::Pending`. If it does, a bug in the +/// `ServiceExt::call_all` implementation might cause the `state` buffer to fill +/// up, and make Zebra hang. pub struct Inbound { // invariants: // * Before setup: address_book and downloads are None, and the *_setup members are Some @@ -153,17 +159,20 @@ impl Service for Inbound { while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {} } - // Now report readiness based on readiness of the inner services, if they're available. - // - // TODO: do we want to propagate backpressure from the download queue or its outbound network here? - // currently, the download queue waits for the outbound network in the download future, and - // drops new requests after it reaches a hard-coded limit. This is the "load shed directly" - // pattern from #1618. - match self.state.poll_ready(cx) { - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - } + // TODO: + // * do we want to propagate backpressure from the download queue or its outbound network here? + // currently, the download queue waits for the outbound network in the download future, and + // drops new requests after it reaches a hard-coded limit. This is the "load shed directly" + // pattern from #1618. + // * if we want to propagate backpressure, add a ReadyCache to ensure that each poll_ready + // has a matching call. See #1593 for details. + + // Ignore state readiness, to avoid reserving its buffer slots. + // We can't use a state ReadyCache, because call_all uses state directly. + // We can't call state.poll_ready, because: + // * call_all also calls poll_ready + // * some requests don't use the state + Poll::Ready(Ok(())) } #[instrument(name = "inbound", skip(self, req))] @@ -183,6 +192,14 @@ impl Service for Inbound { None => async { Err("not ready to serve addresses".into()) }.boxed(), }, zn::Request::BlocksByHash(hashes) => { + // Correctness: + // + // We don't need to use ServiceExt::oneshot here, because + // call_all uses poll_ready internally. + // + // The state must not return Poll::Pending, because call_all + // leaks a buffer reservation every time that happens + // https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112 let state = self.state.clone(); let requests = futures::stream::iter( hashes @@ -219,7 +236,7 @@ impl Service for Inbound { } zn::Request::FindBlocks { known_blocks, stop } => { let request = zs::Request::FindBlockHashes { known_blocks, stop }; - self.state.call(request).map_ok(|resp| match resp { + self.state.clone().oneshot(request).map_ok(|resp| match resp { zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil, zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes), _ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"), @@ -228,7 +245,7 @@ impl Service for Inbound { } zn::Request::FindHeaders { known_blocks, stop } => { let request = zs::Request::FindBlockHeaders { known_blocks, stop }; - self.state.call(request).map_ok(|resp| match resp { + self.state.clone().oneshot(request).map_ok(|resp| match resp { zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil, zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers), _ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),