diff --git a/tower-balance/src/future.rs b/tower-balance/src/future.rs new file mode 100644 index 0000000..58a76d0 --- /dev/null +++ b/tower-balance/src/future.rs @@ -0,0 +1,23 @@ +use crate::error::Error; +use futures::{Future, Poll}; + +pub struct ResponseFuture(F); + +impl ResponseFuture { + pub(crate) fn new(future: F) -> ResponseFuture { + ResponseFuture(future) + } +} + +impl Future for ResponseFuture +where + F: Future, + F::Error: Into, +{ + type Item = F::Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + self.0.poll().map_err(Into::into) + } +} diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index 0c2771c..0a5dfca 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -3,15 +3,16 @@ extern crate futures; #[macro_use] extern crate log; extern crate indexmap; -#[cfg(test)] -extern crate quickcheck; extern crate rand; extern crate tokio_timer; extern crate tower_discover; extern crate tower_service; extern crate tower_util; -use futures::{Async, Future, Poll}; +#[cfg(test)] +extern crate quickcheck; + +use futures::{Async, Poll}; use indexmap::IndexMap; use rand::{rngs::SmallRng, SeedableRng}; use std::fmt; @@ -20,14 +21,19 @@ use tower_service::Service; pub mod choose; pub mod error; +pub mod future; pub mod load; pub mod pool; +#[cfg(test)] +mod test; + pub use self::choose::Choose; pub use self::load::Load; pub use self::pool::Pool; use self::error::Error; +use self::future::ResponseFuture; /// Balances requests across a set of inner services. #[derive(Debug)] @@ -53,8 +59,6 @@ pub struct Balance { not_ready: IndexMap, } -pub struct ResponseFuture(F); - // ===== impl Balance ===== impl Balance @@ -338,158 +342,6 @@ where self.dispatched_ready_index = Some(idx); let rsp = svc.call(request); - ResponseFuture(rsp) - } -} - -// ===== impl ResponseFuture ===== - -impl Future for ResponseFuture -where - F: Future, - F::Error: Into, -{ - type Item = F::Item; - type Error = Error; - - fn poll(&mut self) -> Poll { - self.0.poll().map_err(Into::into) - } -} - -#[cfg(test)] -mod tests { - use futures::future; - use quickcheck::*; - use std::collections::VecDeque; - use tower_discover::Change; - - use super::*; - - struct ReluctantDisco(VecDeque>); - - struct ReluctantService { - polls_until_ready: usize, - } - - impl Discover for ReluctantDisco { - type Key = usize; - type Service = ReluctantService; - type Error = Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let r = self - .0 - .pop_front() - .map(Async::Ready) - .unwrap_or(Async::NotReady); - debug!("polling disco: {:?}", r.is_ready()); - Ok(r) - } - } - - impl Service<()> for ReluctantService { - type Response = (); - type Error = Error; - type Future = future::FutureResult; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if self.polls_until_ready == 0 { - return Ok(Async::Ready(())); - } - - self.polls_until_ready -= 1; - return Ok(Async::NotReady); - } - - fn call(&mut self, _: ()) -> Self::Future { - future::ok(()) - } - } - - quickcheck! { - /// Creates a random number of services, each of which must be polled a random - /// number of times before becoming ready. As the balancer is polled, ensure that - /// it does not become ready prematurely and that services are promoted from - /// not_ready to ready. - fn poll_ready(service_tries: Vec) -> TestResult { - // Stores the number of pending services after each poll_ready call. - let mut pending_at = Vec::new(); - - let disco = { - let mut changes = VecDeque::new(); - for (i, n) in service_tries.iter().map(|n| *n).enumerate() { - for j in 0..n { - if j == pending_at.len() { - pending_at.push(1); - } else { - pending_at[j] += 1; - } - } - - let s = ReluctantService { polls_until_ready: n }; - changes.push_back(Change::Insert(i, s)); - } - ReluctantDisco(changes) - }; - pending_at.push(0); - - let mut balancer = Balance::new(disco, choose::RoundRobin::default()); - - let services = service_tries.len(); - let mut next_pos = 0; - for pending in pending_at.iter().map(|p| *p) { - assert!(pending <= services); - let ready = services - pending; - - match balancer.poll_ready() { - Err(_) => return TestResult::error("poll_ready failed"), - Ok(p) => { - if p.is_ready() != (ready > 0) { - return TestResult::failed(); - } - } - } - - if balancer.num_ready() != ready { - return TestResult::failed(); - } - - if balancer.num_not_ready() != pending { - return TestResult::failed(); - } - - if balancer.is_ready() != (ready > 0) { - return TestResult::failed(); - } - if balancer.is_not_ready() != (ready == 0) { - return TestResult::failed(); - } - - if balancer.dispatched_ready_index.is_some() { - return TestResult::failed(); - } - - if ready == 0 { - if balancer.chosen_ready_index.is_some() { - return TestResult::failed(); - } - } else { - // Check that the round-robin chooser is doing its thing: - match balancer.chosen_ready_index { - None => return TestResult::failed(), - Some(idx) => { - if idx != next_pos { - return TestResult::failed(); - } - } - } - - next_pos = (next_pos + 1) % ready; - } - } - - TestResult::passed() - } + ResponseFuture::new(rsp) } } diff --git a/tower-balance/src/test.rs b/tower-balance/src/test.rs new file mode 100644 index 0000000..37fb5b7 --- /dev/null +++ b/tower-balance/src/test.rs @@ -0,0 +1,136 @@ +use futures::{future, Async, Poll}; +use quickcheck::*; +use std::collections::VecDeque; +use tower_discover::Change; +use tower_service::Service; + +use crate::*; + +type Error = Box; + +struct ReluctantDisco(VecDeque>); + +struct ReluctantService { + polls_until_ready: usize, +} + +impl Discover for ReluctantDisco { + type Key = usize; + type Service = ReluctantService; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let r = self + .0 + .pop_front() + .map(Async::Ready) + .unwrap_or(Async::NotReady); + debug!("polling disco: {:?}", r.is_ready()); + Ok(r) + } +} + +impl Service<()> for ReluctantService { + type Response = (); + type Error = Error; + type Future = future::FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + if self.polls_until_ready == 0 { + return Ok(Async::Ready(())); + } + + self.polls_until_ready -= 1; + return Ok(Async::NotReady); + } + + fn call(&mut self, _: ()) -> Self::Future { + future::ok(()) + } +} + +quickcheck! { + /// Creates a random number of services, each of which must be polled a random + /// number of times before becoming ready. As the balancer is polled, ensure that + /// it does not become ready prematurely and that services are promoted from + /// not_ready to ready. + fn poll_ready(service_tries: Vec) -> TestResult { + // Stores the number of pending services after each poll_ready call. + let mut pending_at = Vec::new(); + + let disco = { + let mut changes = VecDeque::new(); + for (i, n) in service_tries.iter().map(|n| *n).enumerate() { + for j in 0..n { + if j == pending_at.len() { + pending_at.push(1); + } else { + pending_at[j] += 1; + } + } + + let s = ReluctantService { polls_until_ready: n }; + changes.push_back(Change::Insert(i, s)); + } + ReluctantDisco(changes) + }; + pending_at.push(0); + + let mut balancer = Balance::new(disco, choose::RoundRobin::default()); + + let services = service_tries.len(); + let mut next_pos = 0; + for pending in pending_at.iter().map(|p| *p) { + assert!(pending <= services); + let ready = services - pending; + + match balancer.poll_ready() { + Err(_) => return TestResult::error("poll_ready failed"), + Ok(p) => { + if p.is_ready() != (ready > 0) { + return TestResult::failed(); + } + } + } + + if balancer.num_ready() != ready { + return TestResult::failed(); + } + + if balancer.num_not_ready() != pending { + return TestResult::failed(); + } + + if balancer.is_ready() != (ready > 0) { + return TestResult::failed(); + } + if balancer.is_not_ready() != (ready == 0) { + return TestResult::failed(); + } + + if balancer.dispatched_ready_index.is_some() { + return TestResult::failed(); + } + + if ready == 0 { + if balancer.chosen_ready_index.is_some() { + return TestResult::failed(); + } + } else { + // Check that the round-robin chooser is doing its thing: + match balancer.chosen_ready_index { + None => return TestResult::failed(), + Some(idx) => { + if idx != next_pos { + return TestResult::failed(); + } + } + } + + next_pos = (next_pos + 1) % ready; + } + } + + TestResult::passed() + } +}