Add clonable wrapper around TryRecvError

This commit is contained in:
Jane Lusby 2021-01-27 14:18:26 -08:00 committed by teor
parent 6ffeb670ed
commit 685a592399
1 changed files with 16 additions and 27 deletions

View File

@ -10,6 +10,7 @@ use futures::{
future::{FutureExt, TryFutureExt}, future::{FutureExt, TryFutureExt},
stream::Stream, stream::Stream,
}; };
use oneshot::error::TryRecvError;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
@ -72,9 +73,17 @@ pub enum Setup {
/// ///
/// We keep hold of the closed oneshot, so we can use it to create a /// We keep hold of the closed oneshot, so we can use it to create a
/// new error for each `poll_ready` call. /// new error for each `poll_ready` call.
FailedRecv { FailedRecv { error: SharedRecvError },
failed_setup: oneshot::Receiver<NetworkSetupData>, }
},
#[derive(thiserror::Error, Debug, Clone)]
#[error(transparent)]
pub struct SharedRecvError(Arc<TryRecvError>);
impl From<TryRecvError> for SharedRecvError {
fn from(source: TryRecvError) -> Self {
Self(Arc::new(source))
}
} }
/// Uses the node state to respond to inbound peer requests. /// Uses the node state to respond to inbound peer requests.
@ -131,8 +140,6 @@ impl Service<zn::Request> for Inbound {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use oneshot::error::TryRecvError;
// Check whether the network setup is finished, but don't wait for it to // Check whether the network setup is finished, but don't wait for it to
// become ready before reporting readiness. We expect to get it "soon", // become ready before reporting readiness. We expect to get it "soon",
// and reporting unreadiness might cause unwanted load-shedding, since // and reporting unreadiness might cause unwanted load-shedding, since
@ -169,8 +176,9 @@ impl Service<zn::Request> for Inbound {
Err(error @ TryRecvError::Closed) => { Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because network setup failed // Mark the service as failed, because network setup failed
error!(?error, "inbound network setup failed"); error!(?error, "inbound network setup failed");
let error: SharedRecvError = error.into();
self.network = Setup::FailedRecv { self.network = Setup::FailedRecv {
failed_setup: network_setup, error: error.clone(),
}; };
return Poll::Ready(Err(error.into())); return Poll::Ready(Err(error.into()));
} }
@ -187,27 +195,8 @@ impl Service<zn::Request> for Inbound {
} }
// If network setup failed, report service failure // If network setup failed, report service failure
if let Setup::FailedRecv { failed_setup } = &mut self.network { if let Setup::FailedRecv { error } = &mut self.network {
// TryRecvError is not cloneable, so we have to generate a new error from the oneshot, return Poll::Ready(Err(error.clone().into()));
// rather than re-using a clone of the original error
let failed_response = failed_setup.try_recv();
match failed_response {
Err(error @ TryRecvError::Closed) => {
return Poll::Ready(Err(error.into()));
}
Err(error) => {
unreachable!(
"unexpected error kind from failed Inbound network setup oneshot: {:?}",
error
);
}
Ok(_) => {
// we can't log the response, because it doesn't impl Debug
unreachable!(
"unexpected success response from failed Inbound network setup oneshot"
);
}
}
} }
// Clean up completed download tasks, ignoring their results // Clean up completed download tasks, ignoring their results