zebrad: make Inbound propagate backpressure
This commit is contained in:
parent
55f46967b2
commit
65877cb4b1
|
@ -50,10 +50,11 @@ impl Service<zn::Request> for Inbound {
|
||||||
type Future =
|
type Future =
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
#[instrument(skip(self, _cx))]
|
#[instrument(skip(self, cx))]
|
||||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
use oneshot::error::TryRecvError;
|
use oneshot::error::TryRecvError;
|
||||||
match self.network_setup.take() {
|
match self.network_setup.take() {
|
||||||
|
// If we're waiting for setup, check if we became ready
|
||||||
Some(mut rx) => match rx.try_recv() {
|
Some(mut rx) => match rx.try_recv() {
|
||||||
Ok((outbound, address_book)) => {
|
Ok((outbound, address_book)) => {
|
||||||
self.outbound = Some(outbound);
|
self.outbound = Some(outbound);
|
||||||
|
@ -73,7 +74,17 @@ impl Service<zn::Request> for Inbound {
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => Poll::Ready(Ok(())),
|
// Otherwise, check readiness of services we might call to propagate backpressure.
|
||||||
|
None => {
|
||||||
|
match (
|
||||||
|
self.state.poll_ready(cx),
|
||||||
|
self.outbound.as_mut().unwrap().poll_ready(cx),
|
||||||
|
) {
|
||||||
|
(Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
|
||||||
|
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
|
||||||
|
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue