diff --git a/tower-balance/src/error.rs b/tower-balance/src/error.rs new file mode 100644 index 0000000..9fa7959 --- /dev/null +++ b/tower-balance/src/error.rs @@ -0,0 +1,18 @@ +use std::fmt; + +pub(crate) type Error = Box; + +#[derive(Debug)] +pub struct Balance(pub(crate) Error); + +impl fmt::Display for Balance { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "load balancing discover error: {}", self.0) + } +} + +impl std::error::Error for Balance { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&*self.0) + } +} diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index cdd4f67..0b40704 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -13,16 +13,18 @@ extern crate tower_service; use futures::{Async, Future, Poll}; use indexmap::IndexMap; use rand::{rngs::SmallRng, SeedableRng}; -use std::marker::PhantomData; -use std::{error, fmt}; +use std::fmt; use tower_discover::Discover; use tower_service::Service; pub mod choose; +pub mod error; pub mod load; -pub use choose::Choose; -pub use load::Load; +pub use self::choose::Choose; +pub use self::load::Load; + +use self::error::Error; /// Balances requests across a set of inner services. #[derive(Debug)] @@ -48,14 +50,7 @@ pub struct Balance { not_ready: IndexMap, } -/// Error produced by `Balance` -#[derive(Debug)] -pub enum Error { - Inner(T), - Balance(U), -} - -pub struct ResponseFuture(F, PhantomData); +pub struct ResponseFuture(F); // ===== impl Balance ===== @@ -146,15 +141,24 @@ where pub fn num_not_ready(&self) -> usize { self.not_ready.len() } +} +impl Balance +where + D: Discover, + D::Error: Into, + C: Choose, +{ /// Polls `discover` for updates, adding new items to `not_ready`. /// /// Removals may alter the order of either `ready` or `not_ready`. - fn update_from_discover(&mut self) -> Result<(), Error> { + fn update_from_discover(&mut self) -> Result<(), error::Balance> { debug!("updating from discover"); use tower_discover::Change::*; - while let Async::Ready(change) = self.discover.poll().map_err(Error::Balance)? { + while let Async::Ready(change) = + self.discover.poll().map_err(|e| error::Balance(e.into()))? + { match change { Insert(key, mut svc) => { // If the `Insert`ed service is a duplicate of a service already @@ -184,9 +188,7 @@ where /// /// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted /// into `ready`, potentially altering the order of `ready` and/or `not_ready`. - fn promote_to_ready( - &mut self, - ) -> Result<(), Error<>::Error, D::Error>> + fn promote_to_ready(&mut self) -> Result<(), >::Error> where D::Service: Service, { @@ -205,7 +207,7 @@ where .not_ready .get_index_mut(idx) .expect("invalid not_ready index");; - svc.poll_ready().map_err(Error::Inner)?.is_ready() + svc.poll_ready()?.is_ready() }; trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready); if is_ready { @@ -232,7 +234,7 @@ where fn poll_ready_index( &mut self, idx: usize, - ) -> Option>::Error, D::Error>>> + ) -> Option>::Error>> where D::Service: Service, { @@ -240,7 +242,7 @@ where None => return None, Some((_, svc)) => match svc.poll_ready() { Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))), - Err(e) => return Some(Err(Error::Inner(e))), + Err(e) => return Some(Err(e)), Ok(Async::NotReady) => {} }, } @@ -258,7 +260,7 @@ where /// Ensures that . fn choose_and_poll_ready( &mut self, - ) -> Poll<(), Error<>::Error, D::Error>> + ) -> Poll<(), >::Error> where D::Service: Service, { @@ -287,15 +289,17 @@ where } } -impl Service for Balance +impl Service for Balance where - D: Discover, - D::Service: Service, - C: Choose, + D: Discover, + D::Error: Into, + Svc: Service, + Svc::Error: Into, + C: Choose, { - type Response = >::Response; - type Error = Error<>::Error, D::Error>; - type Future = ResponseFuture<>::Future, D::Error>; + type Response = Svc::Response; + type Error = Error; + type Future = ResponseFuture; /// Prepares the balancer to process a request. /// @@ -310,15 +314,16 @@ where if let Some(idx) = self.dispatched_ready_index.take() { // XXX Should we handle per-endpoint errors? self.poll_ready_index(idx) - .expect("invalid dispatched ready key")?; + .expect("invalid dispatched ready key") + .map_err(Into::into)?; } // Update `not_ready` and `ready`. self.update_from_discover()?; - self.promote_to_ready()?; + self.promote_to_ready().map_err(Into::into)?; // Choose the next service to be used by `call`. - self.choose_and_poll_ready() + self.choose_and_poll_ready().map_err(Into::into) } fn call(&mut self, request: Request) -> Self::Future { @@ -330,53 +335,22 @@ where self.dispatched_ready_index = Some(idx); let rsp = svc.call(request); - ResponseFuture(rsp, PhantomData) + ResponseFuture(rsp) } } // ===== impl ResponseFuture ===== -impl Future for ResponseFuture { +impl Future for ResponseFuture +where + F: Future, + F::Error: Into, +{ type Item = F::Item; - type Error = Error; + type Error = Error; fn poll(&mut self) -> Poll { - self.0.poll().map_err(Error::Inner) - } -} - -// ===== impl Error ===== - -impl fmt::Display for Error -where - T: fmt::Display, - U: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Error::Inner(ref why) => fmt::Display::fmt(why, f), - Error::Balance(ref why) => write!(f, "load balancing failed: {}", why), - } - } -} - -impl error::Error for Error -where - T: error::Error, - U: error::Error, -{ - fn cause(&self) -> Option<&error::Error> { - match *self { - Error::Inner(ref why) => Some(why), - Error::Balance(ref why) => Some(why), - } - } - - fn description(&self) -> &str { - match *self { - Error::Inner(_) => "inner service error", - Error::Balance(_) => "load balancing failed", - } + self.0.poll().map_err(Into::into) } } @@ -398,7 +372,7 @@ mod tests { impl Discover for ReluctantDisco { type Key = usize; type Service = ReluctantService; - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { let r = self @@ -413,10 +387,10 @@ mod tests { impl Service<()> for ReluctantService { type Response = (); - type Error = (); - type Future = future::FutureResult<(), ()>; + type Error = Error; + type Future = future::FutureResult; - fn poll_ready(&mut self) -> Poll<(), ()> { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { if self.polls_until_ready == 0 { return Ok(Async::Ready(())); } diff --git a/tower-discover/src/lib.rs b/tower-discover/src/lib.rs index ce7e6f8..d172781 100644 --- a/tower-discover/src/lib.rs +++ b/tower-discover/src/lib.rs @@ -12,6 +12,7 @@ extern crate tower_service; use futures::{Async, Poll, Stream}; use tower_service::Service; +use std::fmt; use std::hash::Hash; use std::iter::{Enumerate, IntoIterator}; use std::marker::PhantomData; @@ -51,7 +52,6 @@ where { inner: Enumerate, } - // ===== impl List ===== impl List @@ -74,7 +74,7 @@ where { type Key = usize; type Service = U; - type Error = (); + type Error = Never; fn poll(&mut self) -> Poll, Self::Error> { match self.inner.next() { @@ -129,6 +129,18 @@ where } } +#[doc(hidden)] +#[derive(Debug)] +pub enum Never {} + +impl fmt::Display for Never { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + match *self {} + } +} + +impl std::error::Error for Never {} + // check that List can be directly over collections #[cfg(test)] #[allow(dead_code)]