combine network setup into an exhaustive match

This commit is contained in:
Jane Lusby 2021-01-27 15:09:40 -08:00 committed by teor
parent 4d6ef89248
commit 4cf331562c
1 changed files with 55 additions and 54 deletions

View File

@ -10,6 +10,7 @@ use futures::{
future::{FutureExt, TryFutureExt},
stream::Stream,
};
use mem::swap;
use oneshot::error::TryRecvError;
use tokio::sync::oneshot;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
@ -131,6 +132,12 @@ impl Inbound {
state,
}
}
fn take_setup(&mut self) -> Setup {
let mut network = Setup::FailedInit;
std::mem::swap(&mut self.network, &mut network);
network
}
}
impl Service<zn::Request> for Inbound {
@ -145,64 +152,58 @@ impl Service<zn::Request> for Inbound {
// and reporting unreadiness might cause unwanted load-shedding, since
// the load-shed middleware is unable to distinguish being unready due
// to load from being unready while waiting on setup.
if matches!(self.network, Setup::AwaitingNetwork { .. }) {
// Unfortunately, we can't match, swap, and destructure at the same time
let mut awaiting_state = Setup::FailedInit;
mem::swap(&mut self.network, &mut awaiting_state);
if let Setup::AwaitingNetwork {
let mut result = Ok(());
self.network = match self.take_setup() {
Setup::AwaitingNetwork {
mut network_setup,
verifier,
} = awaiting_state
{
match network_setup.try_recv() {
Ok((outbound, address_book)) => {
let downloads = Box::pin(Downloads::new(
Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
self.state.clone(),
));
self.network = Setup::Initialized {
address_book,
downloads,
};
}
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
self.network = Setup::AwaitingNetwork {
network_setup,
verifier,
};
}
Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because network setup failed
error!(?error, "inbound network setup failed");
let error: SharedRecvError = error.into();
self.network = Setup::FailedRecv {
error: error.clone(),
};
return Poll::Ready(Err(error.into()));
} => match network_setup.try_recv() {
Ok((outbound, address_book)) => {
let downloads = Box::pin(Downloads::new(
Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
self.state.clone(),
));
Setup::Initialized {
address_book,
downloads,
}
}
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
Setup::AwaitingNetwork {
network_setup,
verifier,
}
}
Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because network setup failed
error!(?error, "inbound network setup failed");
let error: SharedRecvError = error.into();
result = Err(error.clone().into());
Setup::FailedRecv { error }
}
},
// Make sure we left the network setup in a valid state
Setup::FailedInit => unreachable!("incomplete Inbound initialization"),
// If network setup failed, report service failure
Setup::FailedRecv { error } => {
result = Err(error.clone().into());
Setup::FailedRecv { error }
}
}
// Unfortunately, we can't combine these matches into an exhaustive match statement,
// because they use mutable references, or they depend on the state we've just modified.
// Make sure we left the network setup in a valid state
if matches!(self.network, Setup::FailedInit) {
unreachable!("incomplete Inbound initialization");
}
// If network setup failed, report service failure
if let Setup::FailedRecv { error } = &mut self.network {
return Poll::Ready(Err(error.clone().into()));
}
// Clean up completed download tasks, ignoring their results
if let Setup::Initialized { downloads, .. } = &mut self.network {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
}
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
mut downloads,
} => {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
Setup::Initialized {
address_book,
downloads,
}
}
};
// TODO:
// * do we want to propagate backpressure from the download queue or its outbound network?
@ -213,7 +214,7 @@ impl Service<zn::Request> for Inbound {
// So we might also want to propagate backpressure from its buffer.
// * if we want to propagate backpressure, add a ReadyCache for each service, to ensure
// that each poll_ready has a matching call. See #1593 for details.
Poll::Ready(Ok(()))
Poll::Ready(result)
}
#[instrument(name = "inbound", skip(self, req))]