Fix state readiness hangs for Inbound
Use `ServiceExt::oneshot` to perform state requests. Explain that `ServiceExt::call_all` calls `poll_ready` internally. Document a state service invariant imposed by `ServiceExt::call_all`.
This commit is contained in:
parent
4d1a2fd02e
commit
64bc45cd2e
|
@ -530,6 +530,13 @@ impl Service<Request> for StateService {
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
|
// ## Correctness:
|
||||||
|
//
|
||||||
|
// This function must not return Poll::Pending, unless:
|
||||||
|
// 1. We remove all instances of `call_all` on the state service, or fix the leaked
|
||||||
|
// service reservation in the `CallAll` implementation:
|
||||||
|
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
|
||||||
|
// 2. We schedule the current task for wakeup via the `Context`
|
||||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,12 @@ pub type SetupData = (Outbound, Arc<Mutex<AddressBook>>);
|
||||||
/// behind the current tip, while the `Inbound` service is *externally driven*,
|
/// behind the current tip, while the `Inbound` service is *externally driven*,
|
||||||
/// responding to block gossip by attempting to download and validate advertised
|
/// responding to block gossip by attempting to download and validate advertised
|
||||||
/// blocks.
|
/// blocks.
|
||||||
|
///
|
||||||
|
/// ## Correctness
|
||||||
|
///
|
||||||
|
/// The `state` service must not return `Poll::Pending`. If it does, a bug in the
|
||||||
|
/// `ServiceExt::call_all` implementation might cause the `state` buffer to fill
|
||||||
|
/// up, and make Zebra hang.
|
||||||
pub struct Inbound {
|
pub struct Inbound {
|
||||||
// invariants:
|
// invariants:
|
||||||
// * Before setup: address_book and downloads are None, and the *_setup members are Some
|
// * Before setup: address_book and downloads are None, and the *_setup members are Some
|
||||||
|
@ -153,17 +159,20 @@ impl Service<zn::Request> for Inbound {
|
||||||
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
|
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now report readiness based on readiness of the inner services, if they're available.
|
// TODO:
|
||||||
//
|
// * do we want to propagate backpressure from the download queue or its outbound network here?
|
||||||
// TODO: do we want to propagate backpressure from the download queue or its outbound network here?
|
|
||||||
// currently, the download queue waits for the outbound network in the download future, and
|
// currently, the download queue waits for the outbound network in the download future, and
|
||||||
// drops new requests after it reaches a hard-coded limit. This is the "load shed directly"
|
// drops new requests after it reaches a hard-coded limit. This is the "load shed directly"
|
||||||
// pattern from #1618.
|
// pattern from #1618.
|
||||||
match self.state.poll_ready(cx) {
|
// * if we want to propagate backpressure, add a ReadyCache to ensure that each poll_ready
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
// has a matching call. See #1593 for details.
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
|
// Ignore state readiness, to avoid reserving its buffer slots.
|
||||||
}
|
// We can't use a state ReadyCache, because call_all uses state directly.
|
||||||
|
// We can't call state.poll_ready, because:
|
||||||
|
// * call_all also calls poll_ready
|
||||||
|
// * some requests don't use the state
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(name = "inbound", skip(self, req))]
|
#[instrument(name = "inbound", skip(self, req))]
|
||||||
|
@ -183,6 +192,14 @@ impl Service<zn::Request> for Inbound {
|
||||||
None => async { Err("not ready to serve addresses".into()) }.boxed(),
|
None => async { Err("not ready to serve addresses".into()) }.boxed(),
|
||||||
},
|
},
|
||||||
zn::Request::BlocksByHash(hashes) => {
|
zn::Request::BlocksByHash(hashes) => {
|
||||||
|
// Correctness:
|
||||||
|
//
|
||||||
|
// We don't need to use ServiceExt::oneshot here, because
|
||||||
|
// call_all uses poll_ready internally.
|
||||||
|
//
|
||||||
|
// The state must not return Poll::Pending, because call_all
|
||||||
|
// leaks a buffer reservation every time that happens
|
||||||
|
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
|
||||||
let state = self.state.clone();
|
let state = self.state.clone();
|
||||||
let requests = futures::stream::iter(
|
let requests = futures::stream::iter(
|
||||||
hashes
|
hashes
|
||||||
|
@ -219,7 +236,7 @@ impl Service<zn::Request> for Inbound {
|
||||||
}
|
}
|
||||||
zn::Request::FindBlocks { known_blocks, stop } => {
|
zn::Request::FindBlocks { known_blocks, stop } => {
|
||||||
let request = zs::Request::FindBlockHashes { known_blocks, stop };
|
let request = zs::Request::FindBlockHashes { known_blocks, stop };
|
||||||
self.state.call(request).map_ok(|resp| match resp {
|
self.state.clone().oneshot(request).map_ok(|resp| match resp {
|
||||||
zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil,
|
zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil,
|
||||||
zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes),
|
zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes),
|
||||||
_ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"),
|
_ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"),
|
||||||
|
@ -228,7 +245,7 @@ impl Service<zn::Request> for Inbound {
|
||||||
}
|
}
|
||||||
zn::Request::FindHeaders { known_blocks, stop } => {
|
zn::Request::FindHeaders { known_blocks, stop } => {
|
||||||
let request = zs::Request::FindBlockHeaders { known_blocks, stop };
|
let request = zs::Request::FindBlockHeaders { known_blocks, stop };
|
||||||
self.state.call(request).map_ok(|resp| match resp {
|
self.state.clone().oneshot(request).map_ok(|resp| match resp {
|
||||||
zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil,
|
zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil,
|
||||||
zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers),
|
zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers),
|
||||||
_ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),
|
_ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),
|
||||||
|
|
Loading…
Reference in New Issue