balance: Consider new nodes more readily (#134)
When a PeakEwma Balancer discovers a single new endpoint, it will not dispatch requests to the new endpoint until the RTT estimate for an existing endpoint exceeds _one second_. This misconfiguration leads to unexpected behavior. When more than one endpoint is discovered, the balancer may eventually dispatch traffic to some of--but not all of--the new enpoints. This change alters the PeakEwma balancer in two ways: First, the previous DEFAULT_RTT_ESTIMATE of 1s has been changed to be configurable (and required). The library should not hard code a default here. Second, the initial RTT value is now decayed over time so that new endpoints will eventually be considered, even when other endpoints are less loaded than the default RTT estimate.
This commit is contained in:
parent
075ffb3725
commit
bdecb33775
|
@ -28,6 +28,7 @@ use tower_service::Service;
|
|||
|
||||
const REQUESTS: usize = 50_000;
|
||||
const CONCURRENCY: usize = 50;
|
||||
const DEFAULT_RTT: Duration = Duration::from_millis(30);
|
||||
static ENDPOINT_CAPACITY: usize = CONCURRENCY;
|
||||
static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
|
||||
Duration::from_millis(1),
|
||||
|
@ -60,7 +61,7 @@ fn main() {
|
|||
let fut = future::lazy(move || {
|
||||
let decay = Duration::from_secs(10);
|
||||
let d = gen_disco();
|
||||
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(d, decay, lb::load::NoInstrument));
|
||||
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(d, DEFAULT_RTT, decay, lb::load::NoInstrument));
|
||||
run("P2C+PeakEWMA", pe)
|
||||
});
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ use Load;
|
|||
pub struct PeakEwma<S, I = NoInstrument> {
|
||||
service: S,
|
||||
decay_ns: f64,
|
||||
rtt_estimate: Arc<Mutex<Option<RttEstimate>>>,
|
||||
rtt_estimate: Arc<Mutex<RttEstimate>>,
|
||||
instrument: I,
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@ pub struct PeakEwma<S, I = NoInstrument> {
|
|||
pub struct WithPeakEwma<D, I = NoInstrument> {
|
||||
discover: D,
|
||||
decay_ns: f64,
|
||||
default_rtt: Duration,
|
||||
instrument: I,
|
||||
}
|
||||
|
||||
|
@ -59,7 +60,7 @@ pub struct Cost(f64);
|
|||
pub struct Handle {
|
||||
sent_at: Instant,
|
||||
decay_ns: f64,
|
||||
rtt_estimate: Arc<Mutex<Option<RttEstimate>>>,
|
||||
rtt_estimate: Arc<Mutex<RttEstimate>>,
|
||||
}
|
||||
|
||||
/// Holds the current RTT estimateand the last time this value was updated.
|
||||
|
@ -68,21 +69,20 @@ struct RttEstimate {
|
|||
rtt_ns: f64,
|
||||
}
|
||||
|
||||
/// The default RTT estimate is used for nodes that have no load information.
|
||||
///
|
||||
/// We want this value to be high enough such that it is higher than most healthy
|
||||
/// endpoints, but not so high that it should be higher than all endpoints in all
|
||||
/// circumstances. To this end, a default estimate of 1 second seems to be a good
|
||||
/// goldilocks value.
|
||||
const DEFAULT_RTT_ESTIMATE: f64 = NANOS_PER_MILLI * 1000.0;
|
||||
|
||||
const NANOS_PER_MILLI: f64 = 1_000_000.0;
|
||||
|
||||
// ===== impl PeakEwma =====
|
||||
|
||||
impl<D, I> WithPeakEwma<D, I>
|
||||
{
|
||||
pub fn new<Request>(discover: D, decay: Duration, instrument: I) -> Self
|
||||
/// Wraps a `D`-typed `Discover` so that services have a `PeakEwma` load metric.
|
||||
///
|
||||
/// The provided `default_rtt` is used as the default RTT estimate for newly
|
||||
/// added services.
|
||||
///
|
||||
/// They `decay` value determines over what time period a RTT estimate should
|
||||
/// decay.
|
||||
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, instrument: I) -> Self
|
||||
where
|
||||
D: Discover,
|
||||
D::Service: Service<Request>,
|
||||
|
@ -91,6 +91,7 @@ impl<D, I> WithPeakEwma<D, I>
|
|||
WithPeakEwma {
|
||||
discover,
|
||||
decay_ns: nanos(decay),
|
||||
default_rtt,
|
||||
instrument,
|
||||
}
|
||||
}
|
||||
|
@ -109,7 +110,10 @@ where
|
|||
use self::Change::*;
|
||||
|
||||
let change = match try_ready!(self.discover.poll()) {
|
||||
Insert(k, svc) => Insert(k, PeakEwma::new(svc, self.decay_ns, self.instrument.clone())),
|
||||
Insert(k, svc) => {
|
||||
let s = PeakEwma::new(svc, self.default_rtt, self.decay_ns, self.instrument.clone());
|
||||
Insert(k, s)
|
||||
}
|
||||
Remove(k) => Remove(k),
|
||||
};
|
||||
|
||||
|
@ -120,11 +124,11 @@ where
|
|||
// ===== impl PeakEwma =====
|
||||
|
||||
impl<S, I> PeakEwma<S, I> {
|
||||
fn new(service: S, decay_ns: f64, instrument: I) -> Self {
|
||||
fn new(service: S, default_rtt: Duration, decay_ns: f64, instrument: I) -> Self {
|
||||
Self {
|
||||
service,
|
||||
decay_ns,
|
||||
rtt_estimate: Arc::new(Mutex::new(None)),
|
||||
rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))),
|
||||
instrument,
|
||||
}
|
||||
}
|
||||
|
@ -164,13 +168,7 @@ impl<S, I> Load for PeakEwma<S, I> {
|
|||
|
||||
// Update the RTT estimate to account for decay since the last update.
|
||||
// If an estimate has not been established, a default is provided
|
||||
let estimate = {
|
||||
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
|
||||
match *rtt {
|
||||
Some(ref mut rtt) => rtt.decay(self.decay_ns),
|
||||
None => DEFAULT_RTT_ESTIMATE,
|
||||
}
|
||||
};
|
||||
let estimate = self.update_estimate();
|
||||
|
||||
let cost = Cost(estimate * f64::from(pending + 1));
|
||||
trace!(
|
||||
|
@ -183,19 +181,20 @@ impl<S, I> Load for PeakEwma<S, I> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, I> PeakEwma<S, I> {
|
||||
fn update_estimate(&self) -> f64 {
|
||||
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
|
||||
rtt.decay(self.decay_ns)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl RttEstimate =====
|
||||
|
||||
impl RttEstimate {
|
||||
fn new(sent_at: Instant, recv_at: Instant) -> Self {
|
||||
debug_assert!(
|
||||
sent_at <= recv_at,
|
||||
"recv_at={:?} after sent_at={:?}",
|
||||
recv_at,
|
||||
sent_at
|
||||
);
|
||||
|
||||
fn new(rtt_ns: f64) -> Self {
|
||||
debug_assert!(0.0 < rtt_ns, "rtt must be positive");
|
||||
Self {
|
||||
rtt_ns: nanos(recv_at - sent_at),
|
||||
rtt_ns,
|
||||
update_at: clock::now(),
|
||||
}
|
||||
}
|
||||
|
@ -265,12 +264,7 @@ impl Drop for Handle {
|
|||
let recv_at = clock::now();
|
||||
|
||||
if let Ok(mut rtt) = self.rtt_estimate.lock() {
|
||||
if let Some(ref mut rtt) = *rtt {
|
||||
rtt.update(self.sent_at, recv_at, self.decay_ns);
|
||||
return;
|
||||
}
|
||||
|
||||
*rtt = Some(RttEstimate::new(self.sent_at, recv_at));
|
||||
rtt.update(self.sent_at, recv_at, self.decay_ns);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -321,6 +315,8 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
/// The default RTT estimate decays, so that new nodes are considered if the
|
||||
/// default RTT is too high.
|
||||
#[test]
|
||||
fn default_decay() {
|
||||
let time = Arc::new(Mutex::new(Instant::now()));
|
||||
|
@ -329,16 +325,40 @@ mod tests {
|
|||
let mut enter = enter().expect("enter");
|
||||
clock::with_default(&clock, &mut enter, |_| {
|
||||
|
||||
let mut svc = PeakEwma::new(Svc, NANOS_PER_MILLI * 1_000.0, NoInstrument);
|
||||
assert_eq!(svc.load(), Cost(DEFAULT_RTT_ESTIMATE));
|
||||
let svc = PeakEwma::new(Svc, Duration::from_millis(10), NANOS_PER_MILLI * 1_000.0, NoInstrument);
|
||||
let Cost(load) = svc.load();
|
||||
assert_eq!(load, 10.0 * NANOS_PER_MILLI);
|
||||
|
||||
*time.lock().unwrap() += Duration::from_millis(100);
|
||||
let Cost(load) = svc.load();
|
||||
assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI);
|
||||
|
||||
*time.lock().unwrap() += Duration::from_millis(100);
|
||||
let Cost(load) = svc.load();
|
||||
assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI);
|
||||
});
|
||||
}
|
||||
|
||||
/// The default RTT estimate decays, so that new nodes are considered if the
|
||||
/// default RTT is too high.
|
||||
#[test]
|
||||
fn compound_decay() {
|
||||
let time = Arc::new(Mutex::new(Instant::now()));
|
||||
let clock = clock::Clock::new_with_now(Now(time.clone()));
|
||||
|
||||
let mut enter = enter().expect("enter");
|
||||
clock::with_default(&clock, &mut enter, |_| {
|
||||
|
||||
let mut svc = PeakEwma::new(Svc, Duration::from_millis(20), NANOS_PER_MILLI * 1_000.0, NoInstrument);
|
||||
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));
|
||||
|
||||
*time.lock().unwrap() += Duration::from_millis(100);
|
||||
let rsp0 = svc.call(());
|
||||
assert_eq!(svc.load(), Cost(2.0 * DEFAULT_RTT_ESTIMATE));
|
||||
assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI));
|
||||
|
||||
*time.lock().unwrap() += Duration::from_millis(100);
|
||||
let rsp1 = svc.call(());
|
||||
assert_eq!(svc.load(), Cost(3.0 * DEFAULT_RTT_ESTIMATE));
|
||||
assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI));
|
||||
|
||||
*time.lock().unwrap() += Duration::from_millis(100);
|
||||
let () = rsp0.wait().unwrap();
|
||||
|
|
Loading…
Reference in New Issue