Document tower-retry (#147)
This commit is contained in:
parent
68ad8a00f2
commit
5db8510892
|
@ -1,15 +1,23 @@
|
|||
//! A retry "budget" for allowing only a certain amount of retries over time.
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::{Mutex, atomic::{AtomicIsize, Ordering}};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio_timer::clock;
|
||||
|
||||
/// Represents a "budget" for retrying requests.
|
||||
///
|
||||
/// This is useful for limiting the amount of retries a service can perform
|
||||
/// over a period of time, or per a certain number of requests attempted.
|
||||
pub struct Budget {
|
||||
bucket: Bucket,
|
||||
deposit_amount: isize,
|
||||
withdraw_amount: isize,
|
||||
}
|
||||
|
||||
/// Indicates that it is not currently allowed to "withdraw" another retry
|
||||
/// from the [`Budget`](Budget).
|
||||
#[derive(Debug)]
|
||||
pub struct Overdrawn {
|
||||
_inner: (),
|
||||
|
@ -40,6 +48,21 @@ struct Generation {
|
|||
// ===== impl Budget =====
|
||||
|
||||
impl Budget {
|
||||
/// Create a `Budget` that allows for a certain percent of the total
|
||||
/// requests to be retried.
|
||||
///
|
||||
/// - The `ttl` is the duration of how long a single `deposit` should be
|
||||
/// considered. Must be between 1 and 60 seconds.
|
||||
/// - The `min_per_sec` is the minimum rate of retries allowed to accomodate
|
||||
/// clients that have just started issuing requests, or clients that do
|
||||
/// not issue many requests per window.
|
||||
/// - The `retry_percent` is the percentage of calls to `deposit` that can
|
||||
/// be retried. This is in addition to any retries allowed for via
|
||||
/// `min_per_sec`. Must be between 0 and 1000.
|
||||
///
|
||||
/// As an example, if `0.1` is used, then for every 10 calls to `deposit`,
|
||||
/// 1 retry will be allowed. If `2.0` is used, then every `deposit`
|
||||
/// allows for 2 retries.
|
||||
pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
|
||||
// assertions taken from finagle
|
||||
assert!(ttl >= Duration::from_secs(1));
|
||||
|
@ -88,10 +111,16 @@ impl Budget {
|
|||
}
|
||||
}
|
||||
|
||||
/// Store a "deposit" in the budget, which will be used to permit future
|
||||
/// withdrawals.
|
||||
pub fn deposit(&self) {
|
||||
self.bucket.put(self.deposit_amount);
|
||||
}
|
||||
|
||||
/// Check whether there is enough "balance" in the budget to issue a new
|
||||
/// retry.
|
||||
///
|
||||
/// If there is not enough, an `Err(Overdrawn)` is returned.
|
||||
pub fn withdraw(&self) -> Result<(), Overdrawn> {
|
||||
if self.bucket.try_get(self.withdraw_amount) {
|
||||
Ok(())
|
||||
|
|
|
@ -1,3 +1,9 @@
|
|||
#![deny(missing_debug_implementations)]
|
||||
#![deny(missing_docs)]
|
||||
#![deny(warnings)]
|
||||
|
||||
//! Tower middleware for retrying "failed" requests.
|
||||
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate tokio_timer;
|
||||
|
@ -8,12 +14,82 @@ use tower_service::Service;
|
|||
|
||||
pub mod budget;
|
||||
|
||||
/// A "retry policy" to classify if a request should be retried.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// extern crate futures;
|
||||
/// extern crate tower_retry;
|
||||
///
|
||||
/// use tower_retry::Policy;
|
||||
///
|
||||
/// type Req = String;
|
||||
/// type Res = String;
|
||||
///
|
||||
/// struct Attempts(usize);
|
||||
///
|
||||
/// impl<E> Policy<Req, Res, E> for Attempts {
|
||||
/// type Future = futures::future::FutureResult<Self, ()>;
|
||||
///
|
||||
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
|
||||
/// match result {
|
||||
/// Ok(_) => {
|
||||
/// // Treat all `Response`s as success,
|
||||
/// // so don't retry...
|
||||
/// None
|
||||
/// },
|
||||
/// Err(_) => {
|
||||
/// // Treat all errors as failures...
|
||||
/// // But we limit the number of attempts...
|
||||
/// if self.0 > 0 {
|
||||
/// // Try again!
|
||||
/// Some(futures::future::ok(Attempts(self.0 - 1)))
|
||||
/// } else {
|
||||
/// // Used all our attempts, no retry...
|
||||
/// None
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// fn clone_request(&self, req: &Req) -> Option<Req> {
|
||||
/// Some(req.clone())
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub trait Policy<Req, Res, E>: Sized {
|
||||
/// The `Future` type returned by `Policy::retry()`.
|
||||
type Future: Future<Item=Self, Error=()>;
|
||||
/// Check the policy if a certain request should be retried.
|
||||
///
|
||||
/// This method is passed a reference to the original request, and either
|
||||
/// the `Service::Response` or `Service::Error` from the inner service.
|
||||
///
|
||||
/// If the request should **not** be retried, return `None`.
|
||||
///
|
||||
/// If the request *should* be retried, return `Some` future of a new
|
||||
/// policy that would apply for the next request attempt.
|
||||
///
|
||||
/// If the returned `Future` errors, the request will **not** be retried
|
||||
/// after all.
|
||||
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
|
||||
/// Tries to clone a request before being passed to the inner service.
|
||||
///
|
||||
/// If the request cannot be cloned, return `None`.
|
||||
fn clone_request(&self, req: &Req) -> Option<Req>;
|
||||
}
|
||||
|
||||
/// Configure retrying requests of "failed" responses.
|
||||
///
|
||||
/// A `Policy` classifies what is a "failed" response.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Retry<P, S> {
|
||||
policy: P,
|
||||
service: S,
|
||||
}
|
||||
|
||||
/// The `Future` returned by a `Retry` service.
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseFuture<P, S, Request>
|
||||
where
|
||||
|
@ -35,16 +111,10 @@ enum State<F, P, R, E> {
|
|||
Retrying,
|
||||
}
|
||||
|
||||
pub trait Policy<Req, Res, E>: Sized {
|
||||
type Future: Future<Item=Self, Error=()>;
|
||||
fn retry(&self, req: &Req, res: Result<&Res, &E>) -> Option<Self::Future>;
|
||||
fn clone_request(&self, req: &Req) -> Option<Req>;
|
||||
}
|
||||
|
||||
|
||||
// ===== impl Retry =====
|
||||
|
||||
impl<P, S> Retry<P, S> {
|
||||
/// Retry the inner service depending on this [`Policy`][Policy}.
|
||||
pub fn new<Request>(policy: P, service: S) -> Self
|
||||
where
|
||||
P: Policy<Request, S::Response, S::Error> + Clone,
|
||||
|
|
Loading…
Reference in New Issue