Add hedge retry middleware (#236)
Add tower-hedge, a layer that preemptively retries requests which have been outstanding for longer than a given latency percentile. If either of the original future or the retry future completes, that value is used. For more information about hedge requests, see: [The Tail at Scale][1] [1]: https://cseweb.ucsd.edu/~gmporter/classes/fa17/cse124/post/schedule/p74-dean.pdf Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
716bafd922
commit
73c74252e6
|
@ -6,6 +6,7 @@ members = [
|
||||||
"tower-buffer",
|
"tower-buffer",
|
||||||
"tower-discover",
|
"tower-discover",
|
||||||
"tower-filter",
|
"tower-filter",
|
||||||
|
"tower-hedge",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-limit",
|
"tower-limit",
|
||||||
"tower-load-shed",
|
"tower-load-shed",
|
||||||
|
|
|
@ -16,7 +16,7 @@ use crate::{error::Error, future::ResponseFuture};
|
||||||
use futures::Poll;
|
use futures::Poll;
|
||||||
use tower_service::Service;
|
use tower_service::Service;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Filter<T, U> {
|
pub struct Filter<T, U> {
|
||||||
inner: T,
|
inner: T,
|
||||||
predicate: U,
|
predicate: U,
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
[package]
|
||||||
|
name = "tower-hedge"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Alex Leong <adlleong@gmail.com>"]
|
||||||
|
publish = false
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
futures = "0.1"
|
||||||
|
hdrhistogram = "6.0"
|
||||||
|
log = "0.4.1"
|
||||||
|
tower-service = "0.2.0"
|
||||||
|
tower-filter = { version = "0.1", path = "../tower-filter" }
|
||||||
|
tokio-mock-task = { git = "https://github.com/carllerche/tokio-mock-task" }
|
||||||
|
tokio-timer = "0.2.6"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tower-test = { version = "0.1", path = "../tower-test" }
|
||||||
|
tokio-executor = "0.1.2"
|
|
@ -0,0 +1,99 @@
|
||||||
|
use futures::{Async, Future, Poll};
|
||||||
|
use tower_service::Service;
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// A policy which specifies how long each request should be delayed for.
|
||||||
|
pub trait Policy<Request> {
|
||||||
|
fn delay(&self, req: &Request) -> Duration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A middleware which delays sending the request to the underlying service
|
||||||
|
/// for an amount of time specified by the policy.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Delay<P, S> {
|
||||||
|
policy: P,
|
||||||
|
service: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ResponseFuture<Request, S, F> {
|
||||||
|
service: S,
|
||||||
|
state: State<Request, F>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum State<Request, F> {
|
||||||
|
Delaying(tokio_timer::Delay, Option<Request>),
|
||||||
|
Called(F),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, S> Delay<P, S> {
|
||||||
|
pub fn new<Request>(policy: P, service: S) -> Self
|
||||||
|
where
|
||||||
|
P: Policy<Request>,
|
||||||
|
S: Service<Request> + Clone,
|
||||||
|
S::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
Delay { policy, service }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Request, P, S> Service<Request> for Delay<P, S>
|
||||||
|
where
|
||||||
|
P: Policy<Request>,
|
||||||
|
S: Service<Request> + Clone,
|
||||||
|
S::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = super::Error;
|
||||||
|
type Future = ResponseFuture<Request, S, S::Future>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
self.service.poll_ready().map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: Request) -> Self::Future {
|
||||||
|
let deadline = tokio_timer::clock::now() + self.policy.delay(&request);
|
||||||
|
let mut cloned = self.service.clone();
|
||||||
|
// Pass the original service to the ResponseFuture and keep the cloned service on self.
|
||||||
|
let orig = {
|
||||||
|
std::mem::swap(&mut cloned, &mut self.service);
|
||||||
|
cloned
|
||||||
|
};
|
||||||
|
ResponseFuture {
|
||||||
|
service: orig,
|
||||||
|
state: State::Delaying(tokio_timer::Delay::new(deadline), Some(request)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Request, S, F> Future for ResponseFuture<Request, S, F>
|
||||||
|
where
|
||||||
|
F: Future,
|
||||||
|
F::Error: Into<super::Error>,
|
||||||
|
S: Service<Request, Future = F, Response = F::Item, Error = F::Error>,
|
||||||
|
{
|
||||||
|
type Item = F::Item;
|
||||||
|
type Error = super::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
loop {
|
||||||
|
let next = match self.state {
|
||||||
|
State::Delaying(ref mut delay, ref mut req) => match delay.poll() {
|
||||||
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
|
Ok(Async::Ready(())) => {
|
||||||
|
let req = req.take().expect("Missing request in delay");
|
||||||
|
let fut = self.service.call(req);
|
||||||
|
State::Called(fut)
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
},
|
||||||
|
State::Called(ref mut fut) => {
|
||||||
|
return fut.poll().map_err(|e| e.into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.state = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,85 @@
|
||||||
|
use futures::{Async, Future, Poll};
|
||||||
|
use tokio_timer::clock;
|
||||||
|
use tower_service::Service;
|
||||||
|
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
/// Record is the interface for accepting request latency measurements. When
|
||||||
|
/// a request completes, record is called with the elapsed duration between
|
||||||
|
/// when the service was called and when the future completed.
|
||||||
|
pub trait Record {
|
||||||
|
fn record(&mut self, latency: Duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Latency is a middleware that measures request latency and records it to the
|
||||||
|
/// provided Record instance.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Latency<R, S> {
|
||||||
|
rec: R,
|
||||||
|
service: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ResponseFuture<R, F> {
|
||||||
|
start: Instant,
|
||||||
|
rec: R,
|
||||||
|
inner: F,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, R> Latency<R, S>
|
||||||
|
where
|
||||||
|
R: Record + Clone,
|
||||||
|
{
|
||||||
|
pub fn new<Request>(rec: R, service: S) -> Self
|
||||||
|
where
|
||||||
|
S: Service<Request>,
|
||||||
|
S::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
Latency { rec, service }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, R, Request> Service<Request> for Latency<R, S>
|
||||||
|
where
|
||||||
|
S: Service<Request>,
|
||||||
|
S::Error: Into<super::Error>,
|
||||||
|
R: Record + Clone,
|
||||||
|
{
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = super::Error;
|
||||||
|
type Future = ResponseFuture<R, S::Future>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
self.service.poll_ready().map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: Request) -> Self::Future {
|
||||||
|
ResponseFuture {
|
||||||
|
start: clock::now(),
|
||||||
|
rec: self.rec.clone(),
|
||||||
|
inner: self.service.call(request),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R, F> Future for ResponseFuture<R, F>
|
||||||
|
where
|
||||||
|
R: Record,
|
||||||
|
F: Future,
|
||||||
|
F::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
type Item = F::Item;
|
||||||
|
type Error = super::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self.inner.poll() {
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
Ok(Async::Ready(rsp)) => {
|
||||||
|
let duration = clock::now() - self.start;
|
||||||
|
self.rec.record(duration);
|
||||||
|
Ok(Async::Ready(rsp))
|
||||||
|
}
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,255 @@
|
||||||
|
//! Pre-emptively retry requests which have been outstanding for longer
|
||||||
|
//! than a given latency percentile.
|
||||||
|
|
||||||
|
#![deny(warnings)]
|
||||||
|
#![deny(missing_docs)]
|
||||||
|
extern crate futures;
|
||||||
|
extern crate hdrhistogram;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
|
extern crate tokio_timer;
|
||||||
|
extern crate tower_filter;
|
||||||
|
extern crate tower_service;
|
||||||
|
|
||||||
|
use futures::future::FutureResult;
|
||||||
|
use futures::{future, Poll};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
use tower_filter::Filter;
|
||||||
|
|
||||||
|
mod delay;
|
||||||
|
mod latency;
|
||||||
|
mod rotating_histogram;
|
||||||
|
mod select;
|
||||||
|
|
||||||
|
use delay::Delay;
|
||||||
|
use latency::Latency;
|
||||||
|
use rotating_histogram::RotatingHistogram;
|
||||||
|
use select::Select;
|
||||||
|
|
||||||
|
type Histo = Arc<Mutex<RotatingHistogram>>;
|
||||||
|
type Service<S, P> = select::Select<
|
||||||
|
SelectPolicy<P>,
|
||||||
|
Latency<Histo, S>,
|
||||||
|
Delay<DelayPolicy, Filter<Latency<Histo, S>, PolicyPredicate<P>>>,
|
||||||
|
>;
|
||||||
|
/// A middleware that pre-emptively retries requests which have been outstanding
|
||||||
|
/// for longer than a given latency percentile. If either of the original
|
||||||
|
/// future or the retry future completes, that value is used.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Hedge<S, P>(Service<S, P>);
|
||||||
|
/// The Future returned by the hedge Service.
|
||||||
|
pub struct Future<S, P, Request>(<Service<S, P> as tower_service::Service<Request>>::Future)
|
||||||
|
where
|
||||||
|
S: tower_service::Service<Request> + Clone,
|
||||||
|
S::Error: Into<Error>,
|
||||||
|
P: Policy<Request> + Clone;
|
||||||
|
|
||||||
|
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||||
|
|
||||||
|
/// A policy which describes which requests can be cloned and then whether those
|
||||||
|
/// requests should be retried.
|
||||||
|
pub trait Policy<Request> {
|
||||||
|
/// clone_request is called when the request is first received to determine
|
||||||
|
/// if the request is retryable.
|
||||||
|
fn clone_request(&self, req: &Request) -> Option<Request>;
|
||||||
|
/// can_retry is called after the hedge timeout to determine if the hedge
|
||||||
|
/// retry should be issued.
|
||||||
|
fn can_retry(&self, req: &Request) -> bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct PolicyPredicate<P>(P);
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct DelayPolicy {
|
||||||
|
histo: Histo,
|
||||||
|
latency_percentile: f32,
|
||||||
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct SelectPolicy<P> {
|
||||||
|
policy: P,
|
||||||
|
histo: Histo,
|
||||||
|
min_data_points: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, P> Hedge<S, P> {
|
||||||
|
/// Create a new hedge middleware.
|
||||||
|
pub fn new<Request>(
|
||||||
|
service: S,
|
||||||
|
policy: P,
|
||||||
|
min_data_points: u64,
|
||||||
|
latency_percentile: f32,
|
||||||
|
period: Duration,
|
||||||
|
) -> Hedge<S, P>
|
||||||
|
where
|
||||||
|
S: tower_service::Service<Request> + Clone,
|
||||||
|
S::Error: Into<Error>,
|
||||||
|
P: Policy<Request> + Clone,
|
||||||
|
{
|
||||||
|
let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
|
||||||
|
Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A hedge middleware with a prepopulated latency histogram. This is usedful
|
||||||
|
/// for integration tests.
|
||||||
|
pub fn new_with_mock_latencies<Request>(
|
||||||
|
service: S,
|
||||||
|
policy: P,
|
||||||
|
min_data_points: u64,
|
||||||
|
latency_percentile: f32,
|
||||||
|
period: Duration,
|
||||||
|
latencies_ms: &[u64],
|
||||||
|
) -> Hedge<S, P>
|
||||||
|
where
|
||||||
|
S: tower_service::Service<Request> + Clone,
|
||||||
|
S::Error: Into<Error>,
|
||||||
|
P: Policy<Request> + Clone,
|
||||||
|
{
|
||||||
|
let histo = Arc::new(Mutex::new(RotatingHistogram::new(period)));
|
||||||
|
{
|
||||||
|
let mut locked = histo.lock().unwrap();
|
||||||
|
for latency in latencies_ms.iter() {
|
||||||
|
locked.read().record(*latency).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Self::new_with_histo(service, policy, min_data_points, latency_percentile, histo)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_with_histo<Request>(
|
||||||
|
service: S,
|
||||||
|
policy: P,
|
||||||
|
min_data_points: u64,
|
||||||
|
latency_percentile: f32,
|
||||||
|
histo: Histo,
|
||||||
|
) -> Hedge<S, P>
|
||||||
|
where
|
||||||
|
S: tower_service::Service<Request> + Clone,
|
||||||
|
S::Error: Into<Error>,
|
||||||
|
P: Policy<Request> + Clone,
|
||||||
|
{
|
||||||
|
// Clone the underlying service and wrap both copies in a middleware that
|
||||||
|
// records the latencies in a rotating histogram.
|
||||||
|
let recorded_a = Latency::new(histo.clone(), service.clone());
|
||||||
|
let recorded_b = Latency::new(histo.clone(), service);
|
||||||
|
|
||||||
|
// Check policy to see if the hedge request should be issued.
|
||||||
|
let filtered = Filter::new(recorded_b, PolicyPredicate(policy.clone()));
|
||||||
|
|
||||||
|
// Delay the second request by a percentile of the recorded request latency
|
||||||
|
// histogram.
|
||||||
|
let delay_policy = DelayPolicy {
|
||||||
|
histo: histo.clone(),
|
||||||
|
latency_percentile,
|
||||||
|
};
|
||||||
|
let delayed = Delay::new(delay_policy, filtered);
|
||||||
|
|
||||||
|
// If the request is retryable, issue two requests -- the second one delayed
|
||||||
|
// by a latency percentile. Use the first result to complete.
|
||||||
|
let select_policy = SelectPolicy {
|
||||||
|
policy,
|
||||||
|
histo,
|
||||||
|
min_data_points,
|
||||||
|
};
|
||||||
|
Hedge(Select::new(select_policy, recorded_a, delayed))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, P, Request> tower_service::Service<Request> for Hedge<S, P>
|
||||||
|
where
|
||||||
|
S: tower_service::Service<Request> + Clone,
|
||||||
|
S::Error: Into<Error>,
|
||||||
|
P: Policy<Request> + Clone,
|
||||||
|
{
|
||||||
|
type Response = S::Response;
|
||||||
|
type Error = Error;
|
||||||
|
type Future = Future<S, P, Request>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
self.0.poll_ready()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: Request) -> Self::Future {
|
||||||
|
Future(self.0.call(request))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, P, Request> futures::Future for Future<S, P, Request>
|
||||||
|
where
|
||||||
|
S: tower_service::Service<Request> + Clone,
|
||||||
|
S::Error: Into<Error>,
|
||||||
|
P: Policy<Request> + Clone,
|
||||||
|
{
|
||||||
|
type Item = S::Response;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
self.0.poll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Remove when Duration::as_millis() becomes stable.
|
||||||
|
const NANOS_PER_MILLI: u32 = 1_000_000;
|
||||||
|
const MILLIS_PER_SEC: u64 = 1_000;
|
||||||
|
fn millis(duration: Duration) -> u64 {
|
||||||
|
// Round up.
|
||||||
|
let millis = (duration.subsec_nanos() + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI;
|
||||||
|
duration
|
||||||
|
.as_secs()
|
||||||
|
.saturating_mul(MILLIS_PER_SEC)
|
||||||
|
.saturating_add(u64::from(millis))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl latency::Record for Histo {
|
||||||
|
fn record(&mut self, latency: Duration) {
|
||||||
|
let mut locked = self.lock().unwrap();
|
||||||
|
locked.write().record(millis(latency)).unwrap_or_else(|e| {
|
||||||
|
error!("Failed to write to hedge histogram: {:?}", e);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, Request> tower_filter::Predicate<Request> for PolicyPredicate<P>
|
||||||
|
where
|
||||||
|
P: Policy<Request>,
|
||||||
|
{
|
||||||
|
type Future = future::Either<
|
||||||
|
FutureResult<(), tower_filter::error::Error>,
|
||||||
|
future::Empty<(), tower_filter::error::Error>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
fn check(&mut self, request: &Request) -> Self::Future {
|
||||||
|
if self.0.can_retry(request) {
|
||||||
|
future::Either::A(future::ok(()))
|
||||||
|
} else {
|
||||||
|
// If the hedge retry should not be issued, we simply want to wait
|
||||||
|
// for the result of the original request. Therefore we don't want
|
||||||
|
// to return an error here. Instead, we use future::empty to ensure
|
||||||
|
// that the original request wins the select.
|
||||||
|
future::Either::B(future::empty())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Request> delay::Policy<Request> for DelayPolicy {
|
||||||
|
fn delay(&self, _req: &Request) -> Duration {
|
||||||
|
let mut locked = self.histo.lock().unwrap();
|
||||||
|
let millis = locked
|
||||||
|
.read()
|
||||||
|
.value_at_quantile(self.latency_percentile.into());
|
||||||
|
Duration::from_millis(millis)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, Request> select::Policy<Request> for SelectPolicy<P>
|
||||||
|
where
|
||||||
|
P: Policy<Request>,
|
||||||
|
{
|
||||||
|
fn clone_request(&self, req: &Request) -> Option<Request> {
|
||||||
|
self.policy.clone_request(req).filter(|_| {
|
||||||
|
let mut locked = self.histo.lock().unwrap();
|
||||||
|
// Do not attempt a retry if there are insufficiently many data
|
||||||
|
// points in the histogram.
|
||||||
|
locked.read().len() >= self.min_data_points
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
extern crate tokio_timer;
|
||||||
|
|
||||||
|
use hdrhistogram::Histogram;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use tokio_timer::clock;
|
||||||
|
|
||||||
|
/// This represents a "rotating" histogram which stores two histogram, one which
|
||||||
|
/// should be read and one which should be written to. Every period, the read
|
||||||
|
/// histogram is discarded and replaced by the write histogram. The idea here
|
||||||
|
/// is that the read histogram should always contain a full period (the previous
|
||||||
|
/// period) of write operations.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RotatingHistogram {
|
||||||
|
read: Histogram<u64>,
|
||||||
|
write: Histogram<u64>,
|
||||||
|
last_rotation: Instant,
|
||||||
|
period: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RotatingHistogram {
|
||||||
|
pub fn new(period: Duration) -> RotatingHistogram {
|
||||||
|
RotatingHistogram {
|
||||||
|
read: Histogram::<u64>::new_with_bounds(1, 10_000, 3)
|
||||||
|
.expect("Invalid histogram params"),
|
||||||
|
write: Histogram::<u64>::new_with_bounds(1, 10_000, 3)
|
||||||
|
.expect("Invalid histogram params"),
|
||||||
|
last_rotation: clock::now(),
|
||||||
|
period,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read(&mut self) -> &mut Histogram<u64> {
|
||||||
|
self.maybe_rotate();
|
||||||
|
&mut self.read
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(&mut self) -> &mut Histogram<u64> {
|
||||||
|
self.maybe_rotate();
|
||||||
|
&mut self.write
|
||||||
|
}
|
||||||
|
|
||||||
|
fn maybe_rotate(&mut self) {
|
||||||
|
let delta = clock::now() - self.last_rotation;
|
||||||
|
// TODO: replace with delta.duration_div when it becomes stable.
|
||||||
|
let rotations = (nanos(delta) / nanos(self.period)) as u32;
|
||||||
|
if rotations >= 2 {
|
||||||
|
trace!("Time since last rotation is {:?}. clearing!", delta);
|
||||||
|
self.clear();
|
||||||
|
} else if rotations == 1 {
|
||||||
|
trace!("Time since last rotation is {:?}. rotating!", delta);
|
||||||
|
self.rotate();
|
||||||
|
}
|
||||||
|
self.last_rotation += self.period * rotations;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rotate(&mut self) {
|
||||||
|
std::mem::swap(&mut self.read, &mut self.write);
|
||||||
|
trace!("Rotated {:?} points into read", self.read.len());
|
||||||
|
self.write.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear(&mut self) {
|
||||||
|
self.read.clear();
|
||||||
|
self.write.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const NANOS_PER_SEC: u64 = 1_000_000_000;
|
||||||
|
fn nanos(duration: Duration) -> u64 {
|
||||||
|
duration
|
||||||
|
.as_secs()
|
||||||
|
.saturating_mul(NANOS_PER_SEC)
|
||||||
|
.saturating_add(u64::from(duration.subsec_nanos()))
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
use futures::{Async, Future, Poll};
|
||||||
|
use tower_service::Service;
|
||||||
|
|
||||||
|
/// A policy which decides which requests can be cloned and sent to the B
|
||||||
|
/// service.
|
||||||
|
pub trait Policy<Request> {
|
||||||
|
fn clone_request(&self, req: &Request) -> Option<Request>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Select is a middleware which attempts to clone the request and sends the
|
||||||
|
/// original request to the A service and, if the request was able to be cloned,
|
||||||
|
/// the cloned request to the B service. Both resulting futures will be polled
|
||||||
|
/// and whichever future completes first will be used as the result.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Select<P, A, B> {
|
||||||
|
policy: P,
|
||||||
|
a: A,
|
||||||
|
b: B,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ResponseFuture<AF, BF> {
|
||||||
|
a_fut: AF,
|
||||||
|
b_fut: Option<BF>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, A, B> Select<P, A, B> {
|
||||||
|
pub fn new<Request>(policy: P, a: A, b: B) -> Self
|
||||||
|
where
|
||||||
|
P: Policy<Request>,
|
||||||
|
A: Service<Request>,
|
||||||
|
A::Error: Into<super::Error>,
|
||||||
|
B: Service<Request, Response = A::Response>,
|
||||||
|
B::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
Select { policy, a, b }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, A, B, Request> Service<Request> for Select<P, A, B>
|
||||||
|
where
|
||||||
|
P: Policy<Request>,
|
||||||
|
A: Service<Request>,
|
||||||
|
A::Error: Into<super::Error>,
|
||||||
|
B: Service<Request, Response = A::Response>,
|
||||||
|
B::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
type Response = A::Response;
|
||||||
|
type Error = super::Error;
|
||||||
|
type Future = ResponseFuture<A::Future, B::Future>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||||
|
let a = self.a.poll_ready().map_err(|e| e.into())?;
|
||||||
|
let b = self.b.poll_ready().map_err(|e| e.into())?;
|
||||||
|
if a.is_ready() && b.is_ready() {
|
||||||
|
Ok(Async::Ready(()))
|
||||||
|
} else {
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, request: Request) -> Self::Future {
|
||||||
|
let b_fut = if let Some(cloned_req) = self.policy.clone_request(&request) {
|
||||||
|
Some(self.b.call(cloned_req))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
ResponseFuture {
|
||||||
|
a_fut: self.a.call(request),
|
||||||
|
b_fut,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<AF, BF> Future for ResponseFuture<AF, BF>
|
||||||
|
where
|
||||||
|
AF: Future,
|
||||||
|
AF::Error: Into<super::Error>,
|
||||||
|
BF: Future<Item = AF::Item>,
|
||||||
|
BF::Error: Into<super::Error>,
|
||||||
|
{
|
||||||
|
type Item = AF::Item;
|
||||||
|
type Error = super::Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self.a_fut.poll() {
|
||||||
|
Ok(Async::NotReady) => {}
|
||||||
|
Ok(Async::Ready(a)) => return Ok(Async::Ready(a)),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
}
|
||||||
|
if let Some(ref mut b_fut) = self.b_fut {
|
||||||
|
match b_fut.poll() {
|
||||||
|
Ok(Async::NotReady) => {}
|
||||||
|
Ok(Async::Ready(b)) => return Ok(Async::Ready(b)),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,177 @@
|
||||||
|
extern crate futures;
|
||||||
|
extern crate tokio_executor;
|
||||||
|
extern crate tokio_mock_task;
|
||||||
|
extern crate tokio_timer;
|
||||||
|
extern crate tower_hedge as hedge;
|
||||||
|
extern crate tower_service;
|
||||||
|
extern crate tower_test;
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
mod support;
|
||||||
|
use support::*;
|
||||||
|
|
||||||
|
use futures::Future;
|
||||||
|
use hedge::{Hedge, Policy};
|
||||||
|
use tower_service::Service;
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hedge_orig_completes_first() {
|
||||||
|
let (mut service, mut handle) = new_service(TestPolicy);
|
||||||
|
|
||||||
|
mocked(|timer, _| {
|
||||||
|
let mut fut = service.call("orig");
|
||||||
|
// Check that orig request has been issued.
|
||||||
|
let (_, req) = handle.next_request().expect("orig");
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
advance(timer, ms(10));
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
// Check that the hedge has been issued.
|
||||||
|
let (_, _hedge_req) = handle.next_request().expect("hedge");
|
||||||
|
|
||||||
|
req.send_response("orig-done");
|
||||||
|
// Check that fut gets orig response.
|
||||||
|
assert_eq!(fut.wait().unwrap(), "orig-done");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn hedge_hedge_completes_first() {
|
||||||
|
let (mut service, mut handle) = new_service(TestPolicy);
|
||||||
|
|
||||||
|
mocked(|timer, _| {
|
||||||
|
let mut fut = service.call("orig");
|
||||||
|
// Check that orig request has been issued.
|
||||||
|
let (_, _req) = handle.next_request().expect("orig");
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
advance(timer, ms(10));
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
// Check that the hedge has been issued.
|
||||||
|
let (_, hedge_req) = handle.next_request().expect("hedge");
|
||||||
|
hedge_req.send_response("hedge-done");
|
||||||
|
// Check that fut gets hedge response.
|
||||||
|
assert_eq!(fut.wait().unwrap(), "hedge-done");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn completes_before_hedge() {
|
||||||
|
let (mut service, mut handle) = new_service(TestPolicy);
|
||||||
|
|
||||||
|
mocked(|_, _| {
|
||||||
|
let mut fut = service.call("orig");
|
||||||
|
// Check that orig request has been issued.
|
||||||
|
let (_, req) = handle.next_request().expect("orig");
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
req.send_response("orig-done");
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
// Check that fut gets orig response.
|
||||||
|
assert_eq!(fut.wait().unwrap(), "orig-done");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn request_not_retyable() {
|
||||||
|
let (mut service, mut handle) = new_service(TestPolicy);
|
||||||
|
|
||||||
|
mocked(|timer, _| {
|
||||||
|
let mut fut = service.call(NOT_RETRYABLE);
|
||||||
|
// Check that orig request has been issued.
|
||||||
|
let (_, req) = handle.next_request().expect("orig");
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
advance(timer, ms(10));
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
req.send_response("orig-done");
|
||||||
|
// Check that fut gets orig response.
|
||||||
|
assert_eq!(fut.wait().unwrap(), "orig-done");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn request_not_clonable() {
|
||||||
|
let (mut service, mut handle) = new_service(TestPolicy);
|
||||||
|
|
||||||
|
mocked(|timer, _| {
|
||||||
|
let mut fut = service.call(NOT_CLONABLE);
|
||||||
|
// Check that orig request has been issued.
|
||||||
|
let (_, req) = handle.next_request().expect("orig");
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
advance(timer, ms(10));
|
||||||
|
// Check fut is not ready.
|
||||||
|
assert!(fut.poll().unwrap().is_not_ready());
|
||||||
|
// Check hedge has not been issued.
|
||||||
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
||||||
|
|
||||||
|
req.send_response("orig-done");
|
||||||
|
// Check that fut gets orig response.
|
||||||
|
assert_eq!(fut.wait().unwrap(), "orig-done");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
type Req = &'static str;
|
||||||
|
type Res = &'static str;
|
||||||
|
type Mock = tower_test::mock::Mock<Req, Res>;
|
||||||
|
type Handle = tower_test::mock::Handle<Req, Res>;
|
||||||
|
|
||||||
|
static NOT_RETRYABLE: &'static str = "NOT_RETRYABLE";
|
||||||
|
static NOT_CLONABLE: &'static str = "NOT_CLONABLE";
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct TestPolicy;
|
||||||
|
|
||||||
|
impl hedge::Policy<Req> for TestPolicy {
|
||||||
|
fn can_retry(&self, req: &Req) -> bool {
|
||||||
|
*req != NOT_RETRYABLE
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||||
|
if *req == NOT_CLONABLE {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_service<P: Policy<Req> + Clone>(policy: P) -> (Hedge<Mock, P>, Handle) {
|
||||||
|
let (service, handle) = tower_test::mock::pair();
|
||||||
|
|
||||||
|
let mock_latencies: [u64; 10] = [1, 1, 1, 1, 1, 1, 1, 1, 10, 10];
|
||||||
|
|
||||||
|
let service = Hedge::new_with_mock_latencies(
|
||||||
|
service,
|
||||||
|
policy,
|
||||||
|
10,
|
||||||
|
0.9,
|
||||||
|
Duration::from_secs(60),
|
||||||
|
&mock_latencies,
|
||||||
|
);
|
||||||
|
(service, handle)
|
||||||
|
}
|
|
@ -0,0 +1,264 @@
|
||||||
|
// Shamelessly copied verbatim from
|
||||||
|
// https://github.com/tokio-rs/tokio/blob/master/tokio-timer/tests/support/mod.rs
|
||||||
|
|
||||||
|
#![allow(unused_macros, unused_imports, dead_code, deprecated)]
|
||||||
|
|
||||||
|
use tokio_executor::park::{Park, Unpark};
|
||||||
|
use tokio_timer::clock::Now;
|
||||||
|
use tokio_timer::timer::Timer;
|
||||||
|
|
||||||
|
use futures::future::{lazy, Future};
|
||||||
|
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
macro_rules! assert_ready {
|
||||||
|
($f:expr) => {{
|
||||||
|
use ::futures::Async::*;
|
||||||
|
|
||||||
|
match $f.poll().unwrap() {
|
||||||
|
Ready(v) => v,
|
||||||
|
NotReady => panic!("NotReady"),
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
($f:expr, $($msg:expr),+) => {{
|
||||||
|
use ::futures::Async::*;
|
||||||
|
|
||||||
|
match $f.poll().unwrap() {
|
||||||
|
Ready(v) => v,
|
||||||
|
NotReady => {
|
||||||
|
let msg = format!($($msg),+);
|
||||||
|
panic!("NotReady; {}", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! assert_ready_eq {
|
||||||
|
($f:expr, $expect:expr) => {
|
||||||
|
assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! assert_not_ready {
|
||||||
|
($f:expr) => {{
|
||||||
|
let res = $f.poll().unwrap();
|
||||||
|
assert!(!res.is_ready(), "actual={:?}", res)
|
||||||
|
}};
|
||||||
|
($f:expr, $($msg:expr),+) => {{
|
||||||
|
let res = $f.poll().unwrap();
|
||||||
|
if res.is_ready() {
|
||||||
|
let msg = format!($($msg),+);
|
||||||
|
panic!("actual={:?}; {}", res, msg);
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! assert_elapsed {
|
||||||
|
($f:expr) => {
|
||||||
|
assert!($f.poll().unwrap_err().is_elapsed());
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MockTime {
|
||||||
|
inner: Inner,
|
||||||
|
_p: PhantomData<Rc<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MockNow {
|
||||||
|
inner: Inner,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MockPark {
|
||||||
|
inner: Inner,
|
||||||
|
_p: PhantomData<Rc<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MockUnpark {
|
||||||
|
inner: Inner,
|
||||||
|
}
|
||||||
|
|
||||||
|
type Inner = Arc<Mutex<State>>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct State {
|
||||||
|
base: Instant,
|
||||||
|
advance: Duration,
|
||||||
|
unparked: bool,
|
||||||
|
park_for: Option<Duration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ms(num: u64) -> Duration {
|
||||||
|
Duration::from_millis(num)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait IntoTimeout {
|
||||||
|
fn into_timeout(self) -> Option<Duration>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoTimeout for Option<Duration> {
|
||||||
|
fn into_timeout(self) -> Self {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoTimeout for Duration {
|
||||||
|
fn into_timeout(self) -> Option<Duration> {
|
||||||
|
Some(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Turn the timer state once
|
||||||
|
pub fn turn<T: IntoTimeout>(timer: &mut Timer<MockPark>, duration: T) {
|
||||||
|
timer.turn(duration.into_timeout()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Advance the timer the specified amount
|
||||||
|
pub fn advance(timer: &mut Timer<MockPark>, duration: Duration) {
|
||||||
|
let inner = timer.get_park().inner.clone();
|
||||||
|
let deadline = inner.lock().unwrap().now() + duration;
|
||||||
|
|
||||||
|
while inner.lock().unwrap().now() < deadline {
|
||||||
|
let dur = deadline - inner.lock().unwrap().now();
|
||||||
|
turn(timer, dur);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mocked<F, R>(f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut Timer<MockPark>, &mut MockTime) -> R,
|
||||||
|
{
|
||||||
|
mocked_with_now(Instant::now(), f)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mocked_with_now<F, R>(now: Instant, f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut Timer<MockPark>, &mut MockTime) -> R,
|
||||||
|
{
|
||||||
|
let mut time = MockTime::new(now);
|
||||||
|
let park = time.mock_park();
|
||||||
|
let now = ::tokio_timer::clock::Clock::new_with_now(time.mock_now());
|
||||||
|
|
||||||
|
let mut enter = ::tokio_executor::enter().unwrap();
|
||||||
|
|
||||||
|
::tokio_timer::clock::with_default(&now, &mut enter, |enter| {
|
||||||
|
let mut timer = Timer::new(park);
|
||||||
|
let handle = timer.handle();
|
||||||
|
|
||||||
|
::tokio_timer::with_default(&handle, enter, |_| {
|
||||||
|
lazy(|| Ok::<_, ()>(f(&mut timer, &mut time)))
|
||||||
|
.wait()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MockTime {
|
||||||
|
pub fn new(now: Instant) -> MockTime {
|
||||||
|
let state = State {
|
||||||
|
base: now,
|
||||||
|
advance: Duration::default(),
|
||||||
|
unparked: false,
|
||||||
|
park_for: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
MockTime {
|
||||||
|
inner: Arc::new(Mutex::new(state)),
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mock_now(&self) -> MockNow {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
MockNow { inner }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mock_park(&self) -> MockPark {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
MockPark {
|
||||||
|
inner,
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn now(&self) -> Instant {
|
||||||
|
self.inner.lock().unwrap().now()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the total amount of time the time has been advanced.
|
||||||
|
pub fn advanced(&self) -> Duration {
|
||||||
|
self.inner.lock().unwrap().advance
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn advance(&self, duration: Duration) {
|
||||||
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
inner.advance(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The next call to park_timeout will be for this duration, regardless of
|
||||||
|
/// the timeout passed to `park_timeout`.
|
||||||
|
pub fn park_for(&self, duration: Duration) {
|
||||||
|
self.inner.lock().unwrap().park_for = Some(duration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Park for MockPark {
|
||||||
|
type Unpark = MockUnpark;
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn unpark(&self) -> Self::Unpark {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
MockUnpark { inner }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn park(&mut self) -> Result<(), Self::Error> {
|
||||||
|
let mut inner = self.inner.lock().map_err(|_| ())?;
|
||||||
|
|
||||||
|
let duration = inner.park_for.take().expect("call park_for first");
|
||||||
|
|
||||||
|
inner.advance(duration);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
|
||||||
|
let mut inner = self.inner.lock().unwrap();
|
||||||
|
|
||||||
|
if let Some(duration) = inner.park_for.take() {
|
||||||
|
inner.advance(duration);
|
||||||
|
} else {
|
||||||
|
inner.advance(duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Unpark for MockUnpark {
|
||||||
|
fn unpark(&self) {
|
||||||
|
if let Ok(mut inner) = self.inner.lock() {
|
||||||
|
inner.unparked = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Now for MockNow {
|
||||||
|
fn now(&self) -> Instant {
|
||||||
|
self.inner.lock().unwrap().now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl State {
|
||||||
|
fn now(&self) -> Instant {
|
||||||
|
self.base + self.advance
|
||||||
|
}
|
||||||
|
|
||||||
|
fn advance(&mut self, duration: Duration) {
|
||||||
|
self.advance += duration;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue