From 73c74252e685ed4c824b189301ea99a4f601d67d Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Sat, 27 Apr 2019 09:32:26 -0700 Subject: [PATCH] Add hedge retry middleware (#236) Add tower-hedge, a layer that preemptively retries requests which have been outstanding for longer than a given latency percentile. If either of the original future or the retry future completes, that value is used. For more information about hedge requests, see: [The Tail at Scale][1] [1]: https://cseweb.ucsd.edu/~gmporter/classes/fa17/cse124/post/schedule/p74-dean.pdf Signed-off-by: Alex Leong --- Cargo.toml | 1 + tower-filter/src/lib.rs | 2 +- tower-hedge/Cargo.toml | 18 ++ tower-hedge/README.md | 0 tower-hedge/src/delay.rs | 99 ++++++++++ tower-hedge/src/latency.rs | 85 +++++++++ tower-hedge/src/lib.rs | 255 +++++++++++++++++++++++++ tower-hedge/src/rotating_histogram.rs | 75 ++++++++ tower-hedge/src/select.rs | 100 ++++++++++ tower-hedge/tests/hedge.rs | 177 +++++++++++++++++ tower-hedge/tests/support/mod.rs | 264 ++++++++++++++++++++++++++ 11 files changed, 1075 insertions(+), 1 deletion(-) create mode 100644 tower-hedge/Cargo.toml create mode 100644 tower-hedge/README.md create mode 100644 tower-hedge/src/delay.rs create mode 100644 tower-hedge/src/latency.rs create mode 100644 tower-hedge/src/lib.rs create mode 100644 tower-hedge/src/rotating_histogram.rs create mode 100644 tower-hedge/src/select.rs create mode 100644 tower-hedge/tests/hedge.rs create mode 100644 tower-hedge/tests/support/mod.rs diff --git a/Cargo.toml b/Cargo.toml index af7095e..15e655d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "tower-buffer", "tower-discover", "tower-filter", + "tower-hedge", "tower-layer", "tower-limit", "tower-load-shed", diff --git a/tower-filter/src/lib.rs b/tower-filter/src/lib.rs index 84ce453..5aad80f 100644 --- a/tower-filter/src/lib.rs +++ b/tower-filter/src/lib.rs @@ -16,7 +16,7 @@ use crate::{error::Error, future::ResponseFuture}; use futures::Poll; use tower_service::Service; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Filter { inner: T, predicate: U, diff --git a/tower-hedge/Cargo.toml b/tower-hedge/Cargo.toml new file mode 100644 index 0000000..9fcf134 --- /dev/null +++ b/tower-hedge/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "tower-hedge" +version = "0.1.0" +authors = ["Alex Leong "] +publish = false + +[dependencies] +futures = "0.1" +hdrhistogram = "6.0" +log = "0.4.1" +tower-service = "0.2.0" +tower-filter = { version = "0.1", path = "../tower-filter" } +tokio-mock-task = { git = "https://github.com/carllerche/tokio-mock-task" } +tokio-timer = "0.2.6" + +[dev-dependencies] +tower-test = { version = "0.1", path = "../tower-test" } +tokio-executor = "0.1.2" diff --git a/tower-hedge/README.md b/tower-hedge/README.md new file mode 100644 index 0000000..e69de29 diff --git a/tower-hedge/src/delay.rs b/tower-hedge/src/delay.rs new file mode 100644 index 0000000..21ff048 --- /dev/null +++ b/tower-hedge/src/delay.rs @@ -0,0 +1,99 @@ +use futures::{Async, Future, Poll}; +use tower_service::Service; + +use std::time::Duration; + +/// A policy which specifies how long each request should be delayed for. +pub trait Policy { + fn delay(&self, req: &Request) -> Duration; +} + +/// A middleware which delays sending the request to the underlying service +/// for an amount of time specified by the policy. +#[derive(Debug)] +pub struct Delay { + policy: P, + service: S, +} + +#[derive(Debug)] +pub struct ResponseFuture { + service: S, + state: State, +} + +#[derive(Debug)] +enum State { + Delaying(tokio_timer::Delay, Option), + Called(F), +} + +impl Delay { + pub fn new(policy: P, service: S) -> Self + where + P: Policy, + S: Service + Clone, + S::Error: Into, + { + Delay { policy, service } + } +} + +impl Service for Delay +where + P: Policy, + S: Service + Clone, + S::Error: Into, +{ + type Response = S::Response; + type Error = super::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready().map_err(|e| e.into()) + } + + fn call(&mut self, request: Request) -> Self::Future { + let deadline = tokio_timer::clock::now() + self.policy.delay(&request); + let mut cloned = self.service.clone(); + // Pass the original service to the ResponseFuture and keep the cloned service on self. + let orig = { + std::mem::swap(&mut cloned, &mut self.service); + cloned + }; + ResponseFuture { + service: orig, + state: State::Delaying(tokio_timer::Delay::new(deadline), Some(request)), + } + } +} + +impl Future for ResponseFuture +where + F: Future, + F::Error: Into, + S: Service, +{ + type Item = F::Item; + type Error = super::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + State::Delaying(ref mut delay, ref mut req) => match delay.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => { + let req = req.take().expect("Missing request in delay"); + let fut = self.service.call(req); + State::Called(fut) + } + Err(e) => return Err(e.into()), + }, + State::Called(ref mut fut) => { + return fut.poll().map_err(|e| e.into()); + } + }; + self.state = next; + } + } +} diff --git a/tower-hedge/src/latency.rs b/tower-hedge/src/latency.rs new file mode 100644 index 0000000..5633857 --- /dev/null +++ b/tower-hedge/src/latency.rs @@ -0,0 +1,85 @@ +use futures::{Async, Future, Poll}; +use tokio_timer::clock; +use tower_service::Service; + +use std::time::{Duration, Instant}; + +/// Record is the interface for accepting request latency measurements. When +/// a request completes, record is called with the elapsed duration between +/// when the service was called and when the future completed. +pub trait Record { + fn record(&mut self, latency: Duration); +} + +/// Latency is a middleware that measures request latency and records it to the +/// provided Record instance. +#[derive(Clone, Debug)] +pub struct Latency { + rec: R, + service: S, +} + +#[derive(Debug)] +pub struct ResponseFuture { + start: Instant, + rec: R, + inner: F, +} + +impl Latency +where + R: Record + Clone, +{ + pub fn new(rec: R, service: S) -> Self + where + S: Service, + S::Error: Into, + { + Latency { rec, service } + } +} + +impl Service for Latency +where + S: Service, + S::Error: Into, + R: Record + Clone, +{ + type Response = S::Response; + type Error = super::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready().map_err(|e| e.into()) + } + + fn call(&mut self, request: Request) -> Self::Future { + ResponseFuture { + start: clock::now(), + rec: self.rec.clone(), + inner: self.service.call(request), + } + } +} + +impl Future for ResponseFuture +where + R: Record, + F: Future, + F::Error: Into, +{ + type Item = F::Item; + type Error = super::Error; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(rsp)) => { + let duration = clock::now() - self.start; + self.rec.record(duration); + Ok(Async::Ready(rsp)) + } + Err(e) => Err(e.into()), + } + } +} diff --git a/tower-hedge/src/lib.rs b/tower-hedge/src/lib.rs new file mode 100644 index 0000000..dc4b761 --- /dev/null +++ b/tower-hedge/src/lib.rs @@ -0,0 +1,255 @@ +//! Pre-emptively retry requests which have been outstanding for longer +//! than a given latency percentile. + +#![deny(warnings)] +#![deny(missing_docs)] +extern crate futures; +extern crate hdrhistogram; +#[macro_use] +extern crate log; +extern crate tokio_timer; +extern crate tower_filter; +extern crate tower_service; + +use futures::future::FutureResult; +use futures::{future, Poll}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tower_filter::Filter; + +mod delay; +mod latency; +mod rotating_histogram; +mod select; + +use delay::Delay; +use latency::Latency; +use rotating_histogram::RotatingHistogram; +use select::Select; + +type Histo = Arc>; +type Service = select::Select< + SelectPolicy

, + Latency, + Delay, PolicyPredicate

>>, +>; +/// A middleware that pre-emptively retries requests which have been outstanding +/// for longer than a given latency percentile. If either of the original +/// future or the retry future completes, that value is used. +#[derive(Debug)] +pub struct Hedge(Service); +/// The Future returned by the hedge Service. +pub struct Future( as tower_service::Service>::Future) +where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone; + +type Error = Box; + +/// A policy which describes which requests can be cloned and then whether those +/// requests should be retried. +pub trait Policy { + /// clone_request is called when the request is first received to determine + /// if the request is retryable. + fn clone_request(&self, req: &Request) -> Option; + /// can_retry is called after the hedge timeout to determine if the hedge + /// retry should be issued. + fn can_retry(&self, req: &Request) -> bool; +} + +#[derive(Clone, Debug)] +struct PolicyPredicate

(P); +#[derive(Debug)] +struct DelayPolicy { + histo: Histo, + latency_percentile: f32, +} +#[derive(Debug)] +struct SelectPolicy

{ + policy: P, + histo: Histo, + min_data_points: u64, +} + +impl Hedge { + /// Create a new hedge middleware. + pub fn new( + service: S, + policy: P, + min_data_points: u64, + latency_percentile: f32, + period: Duration, + ) -> Hedge + where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone, + { + let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); + Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo) + } + + /// A hedge middleware with a prepopulated latency histogram. This is usedful + /// for integration tests. + pub fn new_with_mock_latencies( + service: S, + policy: P, + min_data_points: u64, + latency_percentile: f32, + period: Duration, + latencies_ms: &[u64], + ) -> Hedge + where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone, + { + let histo = Arc::new(Mutex::new(RotatingHistogram::new(period))); + { + let mut locked = histo.lock().unwrap(); + for latency in latencies_ms.iter() { + locked.read().record(*latency).unwrap(); + } + } + Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo) + } + + fn new_with_histo( + service: S, + policy: P, + min_data_points: u64, + latency_percentile: f32, + histo: Histo, + ) -> Hedge + where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone, + { + // Clone the underlying service and wrap both copies in a middleware that + // records the latencies in a rotating histogram. + let recorded_a = Latency::new(histo.clone(), service.clone()); + let recorded_b = Latency::new(histo.clone(), service); + + // Check policy to see if the hedge request should be issued. + let filtered = Filter::new(recorded_b, PolicyPredicate(policy.clone())); + + // Delay the second request by a percentile of the recorded request latency + // histogram. + let delay_policy = DelayPolicy { + histo: histo.clone(), + latency_percentile, + }; + let delayed = Delay::new(delay_policy, filtered); + + // If the request is retryable, issue two requests -- the second one delayed + // by a latency percentile. Use the first result to complete. + let select_policy = SelectPolicy { + policy, + histo, + min_data_points, + }; + Hedge(Select::new(select_policy, recorded_a, delayed)) + } +} + +impl tower_service::Service for Hedge +where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone, +{ + type Response = S::Response; + type Error = Error; + type Future = Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + fn call(&mut self, request: Request) -> Self::Future { + Future(self.0.call(request)) + } +} + +impl futures::Future for Future +where + S: tower_service::Service + Clone, + S::Error: Into, + P: Policy + Clone, +{ + type Item = S::Response; + type Error = Error; + + fn poll(&mut self) -> Poll { + self.0.poll() + } +} + +// TODO: Remove when Duration::as_millis() becomes stable. +const NANOS_PER_MILLI: u32 = 1_000_000; +const MILLIS_PER_SEC: u64 = 1_000; +fn millis(duration: Duration) -> u64 { + // Round up. + let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI; + duration + .as_secs() + .saturating_mul(MILLIS_PER_SEC) + .saturating_add(u64::from(millis)) +} + +impl latency::Record for Histo { + fn record(&mut self, latency: Duration) { + let mut locked = self.lock().unwrap(); + locked.write().record(millis(latency)).unwrap_or_else(|e| { + error!("Failed to write to hedge histogram: {:?}", e); + }) + } +} + +impl tower_filter::Predicate for PolicyPredicate

+where + P: Policy, +{ + type Future = future::Either< + FutureResult<(), tower_filter::error::Error>, + future::Empty<(), tower_filter::error::Error>, + >; + + fn check(&mut self, request: &Request) -> Self::Future { + if self.0.can_retry(request) { + future::Either::A(future::ok(())) + } else { + // If the hedge retry should not be issued, we simply want to wait + // for the result of the original request. Therefore we don't want + // to return an error here. Instead, we use future::empty to ensure + // that the original request wins the select. + future::Either::B(future::empty()) + } + } +} + +impl delay::Policy for DelayPolicy { + fn delay(&self, _req: &Request) -> Duration { + let mut locked = self.histo.lock().unwrap(); + let millis = locked + .read() + .value_at_quantile(self.latency_percentile.into()); + Duration::from_millis(millis) + } +} + +impl select::Policy for SelectPolicy

+where + P: Policy, +{ + fn clone_request(&self, req: &Request) -> Option { + self.policy.clone_request(req).filter(|_| { + let mut locked = self.histo.lock().unwrap(); + // Do not attempt a retry if there are insufficiently many data + // points in the histogram. + locked.read().len() >= self.min_data_points + }) + } +} diff --git a/tower-hedge/src/rotating_histogram.rs b/tower-hedge/src/rotating_histogram.rs new file mode 100644 index 0000000..7caea77 --- /dev/null +++ b/tower-hedge/src/rotating_histogram.rs @@ -0,0 +1,75 @@ +extern crate tokio_timer; + +use hdrhistogram::Histogram; +use std::time::{Duration, Instant}; + +use tokio_timer::clock; + +/// This represents a "rotating" histogram which stores two histogram, one which +/// should be read and one which should be written to. Every period, the read +/// histogram is discarded and replaced by the write histogram. The idea here +/// is that the read histogram should always contain a full period (the previous +/// period) of write operations. +#[derive(Debug)] +pub struct RotatingHistogram { + read: Histogram, + write: Histogram, + last_rotation: Instant, + period: Duration, +} + +impl RotatingHistogram { + pub fn new(period: Duration) -> RotatingHistogram { + RotatingHistogram { + read: Histogram::::new_with_bounds(1, 10_000, 3) + .expect("Invalid histogram params"), + write: Histogram::::new_with_bounds(1, 10_000, 3) + .expect("Invalid histogram params"), + last_rotation: clock::now(), + period, + } + } + + pub fn read(&mut self) -> &mut Histogram { + self.maybe_rotate(); + &mut self.read + } + + pub fn write(&mut self) -> &mut Histogram { + self.maybe_rotate(); + &mut self.write + } + + fn maybe_rotate(&mut self) { + let delta = clock::now() - self.last_rotation; + // TODO: replace with delta.duration_div when it becomes stable. + let rotations = (nanos(delta) / nanos(self.period)) as u32; + if rotations >= 2 { + trace!("Time since last rotation is {:?}. clearing!", delta); + self.clear(); + } else if rotations == 1 { + trace!("Time since last rotation is {:?}. rotating!", delta); + self.rotate(); + } + self.last_rotation += self.period * rotations; + } + + fn rotate(&mut self) { + std::mem::swap(&mut self.read, &mut self.write); + trace!("Rotated {:?} points into read", self.read.len()); + self.write.clear(); + } + + fn clear(&mut self) { + self.read.clear(); + self.write.clear(); + } +} + +const NANOS_PER_SEC: u64 = 1_000_000_000; +fn nanos(duration: Duration) -> u64 { + duration + .as_secs() + .saturating_mul(NANOS_PER_SEC) + .saturating_add(u64::from(duration.subsec_nanos())) +} diff --git a/tower-hedge/src/select.rs b/tower-hedge/src/select.rs new file mode 100644 index 0000000..a78a6a1 --- /dev/null +++ b/tower-hedge/src/select.rs @@ -0,0 +1,100 @@ +use futures::{Async, Future, Poll}; +use tower_service::Service; + +/// A policy which decides which requests can be cloned and sent to the B +/// service. +pub trait Policy { + fn clone_request(&self, req: &Request) -> Option; +} + +/// Select is a middleware which attempts to clone the request and sends the +/// original request to the A service and, if the request was able to be cloned, +/// the cloned request to the B service. Both resulting futures will be polled +/// and whichever future completes first will be used as the result. +#[derive(Debug)] +pub struct Select { + policy: P, + a: A, + b: B, +} + +#[derive(Debug)] +pub struct ResponseFuture { + a_fut: AF, + b_fut: Option, +} + +impl Select { + pub fn new(policy: P, a: A, b: B) -> Self + where + P: Policy, + A: Service, + A::Error: Into, + B: Service, + B::Error: Into, + { + Select { policy, a, b } + } +} + +impl Service for Select +where + P: Policy, + A: Service, + A::Error: Into, + B: Service, + B::Error: Into, +{ + type Response = A::Response; + type Error = super::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + let a = self.a.poll_ready().map_err(|e| e.into())?; + let b = self.b.poll_ready().map_err(|e| e.into())?; + if a.is_ready() && b.is_ready() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + + fn call(&mut self, request: Request) -> Self::Future { + let b_fut = if let Some(cloned_req) = self.policy.clone_request(&request) { + Some(self.b.call(cloned_req)) + } else { + None + }; + ResponseFuture { + a_fut: self.a.call(request), + b_fut, + } + } +} + +impl Future for ResponseFuture +where + AF: Future, + AF::Error: Into, + BF: Future, + BF::Error: Into, +{ + type Item = AF::Item; + type Error = super::Error; + + fn poll(&mut self) -> Poll { + match self.a_fut.poll() { + Ok(Async::NotReady) => {} + Ok(Async::Ready(a)) => return Ok(Async::Ready(a)), + Err(e) => return Err(e.into()), + } + if let Some(ref mut b_fut) = self.b_fut { + match b_fut.poll() { + Ok(Async::NotReady) => {} + Ok(Async::Ready(b)) => return Ok(Async::Ready(b)), + Err(e) => return Err(e.into()), + } + } + return Ok(Async::NotReady); + } +} diff --git a/tower-hedge/tests/hedge.rs b/tower-hedge/tests/hedge.rs new file mode 100644 index 0000000..edae7cd --- /dev/null +++ b/tower-hedge/tests/hedge.rs @@ -0,0 +1,177 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_mock_task; +extern crate tokio_timer; +extern crate tower_hedge as hedge; +extern crate tower_service; +extern crate tower_test; + +#[macro_use] +mod support; +use support::*; + +use futures::Future; +use hedge::{Hedge, Policy}; +use tower_service::Service; + +use std::time::Duration; + +#[test] +fn hedge_orig_completes_first() { + let (mut service, mut handle) = new_service(TestPolicy); + + mocked(|timer, _| { + let mut fut = service.call("orig"); + // Check that orig request has been issued. + let (_, req) = handle.next_request().expect("orig"); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + advance(timer, ms(10)); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + // Check that the hedge has been issued. + let (_, _hedge_req) = handle.next_request().expect("hedge"); + + req.send_response("orig-done"); + // Check that fut gets orig response. + assert_eq!(fut.wait().unwrap(), "orig-done"); + }); +} + +#[test] +fn hedge_hedge_completes_first() { + let (mut service, mut handle) = new_service(TestPolicy); + + mocked(|timer, _| { + let mut fut = service.call("orig"); + // Check that orig request has been issued. + let (_, _req) = handle.next_request().expect("orig"); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + advance(timer, ms(10)); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + + // Check that the hedge has been issued. + let (_, hedge_req) = handle.next_request().expect("hedge"); + hedge_req.send_response("hedge-done"); + // Check that fut gets hedge response. + assert_eq!(fut.wait().unwrap(), "hedge-done"); + }); +} + +#[test] +fn completes_before_hedge() { + let (mut service, mut handle) = new_service(TestPolicy); + + mocked(|_, _| { + let mut fut = service.call("orig"); + // Check that orig request has been issued. + let (_, req) = handle.next_request().expect("orig"); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + + req.send_response("orig-done"); + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + // Check that fut gets orig response. + assert_eq!(fut.wait().unwrap(), "orig-done"); + }); +} + +#[test] +fn request_not_retyable() { + let (mut service, mut handle) = new_service(TestPolicy); + + mocked(|timer, _| { + let mut fut = service.call(NOT_RETRYABLE); + // Check that orig request has been issued. + let (_, req) = handle.next_request().expect("orig"); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + advance(timer, ms(10)); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + + req.send_response("orig-done"); + // Check that fut gets orig response. + assert_eq!(fut.wait().unwrap(), "orig-done"); + }); +} + +#[test] +fn request_not_clonable() { + let (mut service, mut handle) = new_service(TestPolicy); + + mocked(|timer, _| { + let mut fut = service.call(NOT_CLONABLE); + // Check that orig request has been issued. + let (_, req) = handle.next_request().expect("orig"); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + advance(timer, ms(10)); + // Check fut is not ready. + assert!(fut.poll().unwrap().is_not_ready()); + // Check hedge has not been issued. + assert!(handle.poll_request().unwrap().is_not_ready()); + + req.send_response("orig-done"); + // Check that fut gets orig response. + assert_eq!(fut.wait().unwrap(), "orig-done"); + }); +} + +type Req = &'static str; +type Res = &'static str; +type Mock = tower_test::mock::Mock; +type Handle = tower_test::mock::Handle; + +static NOT_RETRYABLE: &'static str = "NOT_RETRYABLE"; +static NOT_CLONABLE: &'static str = "NOT_CLONABLE"; + +#[derive(Clone)] +struct TestPolicy; + +impl hedge::Policy for TestPolicy { + fn can_retry(&self, req: &Req) -> bool { + *req != NOT_RETRYABLE + } + + fn clone_request(&self, req: &Req) -> Option { + if *req == NOT_CLONABLE { + None + } else { + Some(req) + } + } +} + +fn new_service + Clone>(policy: P) -> (Hedge, Handle) { + let (service, handle) = tower_test::mock::pair(); + + let mock_latencies: [u64; 10] = [1, 1, 1, 1, 1, 1, 1, 1, 10, 10]; + + let service = Hedge::new_with_mock_latencies( + service, + policy, + 10, + 0.9, + Duration::from_secs(60), + &mock_latencies, + ); + (service, handle) +} diff --git a/tower-hedge/tests/support/mod.rs b/tower-hedge/tests/support/mod.rs new file mode 100644 index 0000000..01955f8 --- /dev/null +++ b/tower-hedge/tests/support/mod.rs @@ -0,0 +1,264 @@ +// Shamelessly copied verbatim from +// https://github.com/tokio-rs/tokio/blob/master/tokio-timer/tests/support/mod.rs + +#![allow(unused_macros, unused_imports, dead_code, deprecated)] + +use tokio_executor::park::{Park, Unpark}; +use tokio_timer::clock::Now; +use tokio_timer::timer::Timer; + +use futures::future::{lazy, Future}; + +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +macro_rules! assert_ready { + ($f:expr) => {{ + use ::futures::Async::*; + + match $f.poll().unwrap() { + Ready(v) => v, + NotReady => panic!("NotReady"), + } + }}; + ($f:expr, $($msg:expr),+) => {{ + use ::futures::Async::*; + + match $f.poll().unwrap() { + Ready(v) => v, + NotReady => { + let msg = format!($($msg),+); + panic!("NotReady; {}", msg) + } + } + }} +} + +macro_rules! assert_ready_eq { + ($f:expr, $expect:expr) => { + assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect)); + }; +} + +macro_rules! assert_not_ready { + ($f:expr) => {{ + let res = $f.poll().unwrap(); + assert!(!res.is_ready(), "actual={:?}", res) + }}; + ($f:expr, $($msg:expr),+) => {{ + let res = $f.poll().unwrap(); + if res.is_ready() { + let msg = format!($($msg),+); + panic!("actual={:?}; {}", res, msg); + } + }}; +} + +macro_rules! assert_elapsed { + ($f:expr) => { + assert!($f.poll().unwrap_err().is_elapsed()); + }; +} + +#[derive(Debug)] +pub struct MockTime { + inner: Inner, + _p: PhantomData>, +} + +#[derive(Debug)] +pub struct MockNow { + inner: Inner, +} + +#[derive(Debug)] +pub struct MockPark { + inner: Inner, + _p: PhantomData>, +} + +#[derive(Debug)] +pub struct MockUnpark { + inner: Inner, +} + +type Inner = Arc>; + +#[derive(Debug)] +struct State { + base: Instant, + advance: Duration, + unparked: bool, + park_for: Option, +} + +pub fn ms(num: u64) -> Duration { + Duration::from_millis(num) +} + +pub trait IntoTimeout { + fn into_timeout(self) -> Option; +} + +impl IntoTimeout for Option { + fn into_timeout(self) -> Self { + self + } +} + +impl IntoTimeout for Duration { + fn into_timeout(self) -> Option { + Some(self) + } +} + +/// Turn the timer state once +pub fn turn(timer: &mut Timer, duration: T) { + timer.turn(duration.into_timeout()).unwrap(); +} + +/// Advance the timer the specified amount +pub fn advance(timer: &mut Timer, duration: Duration) { + let inner = timer.get_park().inner.clone(); + let deadline = inner.lock().unwrap().now() + duration; + + while inner.lock().unwrap().now() < deadline { + let dur = deadline - inner.lock().unwrap().now(); + turn(timer, dur); + } +} + +pub fn mocked(f: F) -> R +where + F: FnOnce(&mut Timer, &mut MockTime) -> R, +{ + mocked_with_now(Instant::now(), f) +} + +pub fn mocked_with_now(now: Instant, f: F) -> R +where + F: FnOnce(&mut Timer, &mut MockTime) -> R, +{ + let mut time = MockTime::new(now); + let park = time.mock_park(); + let now = ::tokio_timer::clock::Clock::new_with_now(time.mock_now()); + + let mut enter = ::tokio_executor::enter().unwrap(); + + ::tokio_timer::clock::with_default(&now, &mut enter, |enter| { + let mut timer = Timer::new(park); + let handle = timer.handle(); + + ::tokio_timer::with_default(&handle, enter, |_| { + lazy(|| Ok::<_, ()>(f(&mut timer, &mut time))) + .wait() + .unwrap() + }) + }) +} + +impl MockTime { + pub fn new(now: Instant) -> MockTime { + let state = State { + base: now, + advance: Duration::default(), + unparked: false, + park_for: None, + }; + + MockTime { + inner: Arc::new(Mutex::new(state)), + _p: PhantomData, + } + } + + pub fn mock_now(&self) -> MockNow { + let inner = self.inner.clone(); + MockNow { inner } + } + + pub fn mock_park(&self) -> MockPark { + let inner = self.inner.clone(); + MockPark { + inner, + _p: PhantomData, + } + } + + pub fn now(&self) -> Instant { + self.inner.lock().unwrap().now() + } + + /// Returns the total amount of time the time has been advanced. + pub fn advanced(&self) -> Duration { + self.inner.lock().unwrap().advance + } + + pub fn advance(&self, duration: Duration) { + let mut inner = self.inner.lock().unwrap(); + inner.advance(duration); + } + + /// The next call to park_timeout will be for this duration, regardless of + /// the timeout passed to `park_timeout`. + pub fn park_for(&self, duration: Duration) { + self.inner.lock().unwrap().park_for = Some(duration); + } +} + +impl Park for MockPark { + type Unpark = MockUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + let inner = self.inner.clone(); + MockUnpark { inner } + } + + fn park(&mut self) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().map_err(|_| ())?; + + let duration = inner.park_for.take().expect("call park_for first"); + + inner.advance(duration); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + let mut inner = self.inner.lock().unwrap(); + + if let Some(duration) = inner.park_for.take() { + inner.advance(duration); + } else { + inner.advance(duration); + } + + Ok(()) + } +} + +impl Unpark for MockUnpark { + fn unpark(&self) { + if let Ok(mut inner) = self.inner.lock() { + inner.unparked = true; + } + } +} + +impl Now for MockNow { + fn now(&self) -> Instant { + self.inner.lock().unwrap().now() + } +} + +impl State { + fn now(&self) -> Instant { + self.base + self.advance + } + + fn advance(&mut self, duration: Duration) { + self.advance += duration; + } +}