balance: minor cleanup (#220)

This commit is contained in:
Carl Lerche 2019-03-29 19:17:21 -07:00 committed by GitHub
parent 83be955afe
commit d7516c3222
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 169 additions and 158 deletions

View File

@ -0,0 +1,23 @@
use crate::error::Error;
use futures::{Future, Poll};
pub struct ResponseFuture<F>(F);
impl<F> ResponseFuture<F> {
pub(crate) fn new(future: F) -> ResponseFuture<F> {
ResponseFuture(future)
}
}
impl<F> Future for ResponseFuture<F>
where
F: Future,
F::Error: Into<Error>,
{
type Item = F::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(Into::into)
}
}

View File

@ -3,15 +3,16 @@ extern crate futures;
#[macro_use]
extern crate log;
extern crate indexmap;
#[cfg(test)]
extern crate quickcheck;
extern crate rand;
extern crate tokio_timer;
extern crate tower_discover;
extern crate tower_service;
extern crate tower_util;
use futures::{Async, Future, Poll};
#[cfg(test)]
extern crate quickcheck;
use futures::{Async, Poll};
use indexmap::IndexMap;
use rand::{rngs::SmallRng, SeedableRng};
use std::fmt;
@ -20,14 +21,19 @@ use tower_service::Service;
pub mod choose;
pub mod error;
pub mod future;
pub mod load;
pub mod pool;
#[cfg(test)]
mod test;
pub use self::choose::Choose;
pub use self::load::Load;
pub use self::pool::Pool;
use self::error::Error;
use self::future::ResponseFuture;
/// Balances requests across a set of inner services.
#[derive(Debug)]
@ -53,8 +59,6 @@ pub struct Balance<D: Discover, C> {
not_ready: IndexMap<D::Key, D::Service>,
}
pub struct ResponseFuture<F>(F);
// ===== impl Balance =====
impl<D> Balance<D, choose::PowerOfTwoChoices>
@ -338,158 +342,6 @@ where
self.dispatched_ready_index = Some(idx);
let rsp = svc.call(request);
ResponseFuture(rsp)
}
}
// ===== impl ResponseFuture =====
impl<F> Future for ResponseFuture<F>
where
F: Future,
F::Error: Into<Error>,
{
type Item = F::Item;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll().map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use futures::future;
use quickcheck::*;
use std::collections::VecDeque;
use tower_discover::Change;
use super::*;
struct ReluctantDisco(VecDeque<Change<usize, ReluctantService>>);
struct ReluctantService {
polls_until_ready: usize,
}
impl Discover for ReluctantDisco {
type Key = usize;
type Service = ReluctantService;
type Error = Error;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
let r = self
.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
}
}
impl Service<()> for ReluctantService {
type Response = ();
type Error = Error;
type Future = future::FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.polls_until_ready == 0 {
return Ok(Async::Ready(()));
}
self.polls_until_ready -= 1;
return Ok(Async::NotReady);
}
fn call(&mut self, _: ()) -> Self::Future {
future::ok(())
}
}
quickcheck! {
/// Creates a random number of services, each of which must be polled a random
/// number of times before becoming ready. As the balancer is polled, ensure that
/// it does not become ready prematurely and that services are promoted from
/// not_ready to ready.
fn poll_ready(service_tries: Vec<usize>) -> TestResult {
// Stores the number of pending services after each poll_ready call.
let mut pending_at = Vec::new();
let disco = {
let mut changes = VecDeque::new();
for (i, n) in service_tries.iter().map(|n| *n).enumerate() {
for j in 0..n {
if j == pending_at.len() {
pending_at.push(1);
} else {
pending_at[j] += 1;
}
}
let s = ReluctantService { polls_until_ready: n };
changes.push_back(Change::Insert(i, s));
}
ReluctantDisco(changes)
};
pending_at.push(0);
let mut balancer = Balance::new(disco, choose::RoundRobin::default());
let services = service_tries.len();
let mut next_pos = 0;
for pending in pending_at.iter().map(|p| *p) {
assert!(pending <= services);
let ready = services - pending;
match balancer.poll_ready() {
Err(_) => return TestResult::error("poll_ready failed"),
Ok(p) => {
if p.is_ready() != (ready > 0) {
return TestResult::failed();
}
}
}
if balancer.num_ready() != ready {
return TestResult::failed();
}
if balancer.num_not_ready() != pending {
return TestResult::failed();
}
if balancer.is_ready() != (ready > 0) {
return TestResult::failed();
}
if balancer.is_not_ready() != (ready == 0) {
return TestResult::failed();
}
if balancer.dispatched_ready_index.is_some() {
return TestResult::failed();
}
if ready == 0 {
if balancer.chosen_ready_index.is_some() {
return TestResult::failed();
}
} else {
// Check that the round-robin chooser is doing its thing:
match balancer.chosen_ready_index {
None => return TestResult::failed(),
Some(idx) => {
if idx != next_pos {
return TestResult::failed();
}
}
}
next_pos = (next_pos + 1) % ready;
}
}
TestResult::passed()
}
ResponseFuture::new(rsp)
}
}

136
tower-balance/src/test.rs Normal file
View File

@ -0,0 +1,136 @@
use futures::{future, Async, Poll};
use quickcheck::*;
use std::collections::VecDeque;
use tower_discover::Change;
use tower_service::Service;
use crate::*;
type Error = Box<dyn std::error::Error + Send + Sync>;
struct ReluctantDisco(VecDeque<Change<usize, ReluctantService>>);
struct ReluctantService {
polls_until_ready: usize,
}
impl Discover for ReluctantDisco {
type Key = usize;
type Service = ReluctantService;
type Error = Error;
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
let r = self
.0
.pop_front()
.map(Async::Ready)
.unwrap_or(Async::NotReady);
debug!("polling disco: {:?}", r.is_ready());
Ok(r)
}
}
impl Service<()> for ReluctantService {
type Response = ();
type Error = Error;
type Future = future::FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.polls_until_ready == 0 {
return Ok(Async::Ready(()));
}
self.polls_until_ready -= 1;
return Ok(Async::NotReady);
}
fn call(&mut self, _: ()) -> Self::Future {
future::ok(())
}
}
quickcheck! {
/// Creates a random number of services, each of which must be polled a random
/// number of times before becoming ready. As the balancer is polled, ensure that
/// it does not become ready prematurely and that services are promoted from
/// not_ready to ready.
fn poll_ready(service_tries: Vec<usize>) -> TestResult {
// Stores the number of pending services after each poll_ready call.
let mut pending_at = Vec::new();
let disco = {
let mut changes = VecDeque::new();
for (i, n) in service_tries.iter().map(|n| *n).enumerate() {
for j in 0..n {
if j == pending_at.len() {
pending_at.push(1);
} else {
pending_at[j] += 1;
}
}
let s = ReluctantService { polls_until_ready: n };
changes.push_back(Change::Insert(i, s));
}
ReluctantDisco(changes)
};
pending_at.push(0);
let mut balancer = Balance::new(disco, choose::RoundRobin::default());
let services = service_tries.len();
let mut next_pos = 0;
for pending in pending_at.iter().map(|p| *p) {
assert!(pending <= services);
let ready = services - pending;
match balancer.poll_ready() {
Err(_) => return TestResult::error("poll_ready failed"),
Ok(p) => {
if p.is_ready() != (ready > 0) {
return TestResult::failed();
}
}
}
if balancer.num_ready() != ready {
return TestResult::failed();
}
if balancer.num_not_ready() != pending {
return TestResult::failed();
}
if balancer.is_ready() != (ready > 0) {
return TestResult::failed();
}
if balancer.is_not_ready() != (ready == 0) {
return TestResult::failed();
}
if balancer.dispatched_ready_index.is_some() {
return TestResult::failed();
}
if ready == 0 {
if balancer.chosen_ready_index.is_some() {
return TestResult::failed();
}
} else {
// Check that the round-robin chooser is doing its thing:
match balancer.chosen_ready_index {
None => return TestResult::failed(),
Some(idx) => {
if idx != next_pos {
return TestResult::failed();
}
}
}
next_pos = (next_pos + 1) % ready;
}
}
TestResult::passed()
}
}