Make Buffer::new use DefaultExecutor (#122)

Fixes #121.
This commit is contained in:
Jon Gjengset 2018-11-27 14:14:43 -05:00 committed by Carl Lerche
parent 72508ff4ba
commit c5cb47d612
4 changed files with 63 additions and 43 deletions

View File

@ -56,34 +56,30 @@ fn main() {
println!("]");
let mut rt = runtime::Runtime::new().unwrap();
let executor = rt.executor();
let exec = executor.clone();
let fut = future::lazy(move || {
let decay = Duration::from_secs(10);
let d = gen_disco(exec.clone());
let d = gen_disco();
let pe = lb::Balance::p2c(lb::load::WithPeakEwma::new(d, decay, lb::load::NoInstrument));
run("P2C+PeakEWMA", pe, &exec)
run("P2C+PeakEWMA", pe)
});
let exec = executor.clone();
let fut = fut.and_then(move |_| {
let d = gen_disco(exec.clone());
let d = gen_disco();
let ll = lb::Balance::p2c(lb::load::WithPendingRequests::new(d, lb::load::NoInstrument));
run("P2C+LeastLoaded", ll, &exec)
run("P2C+LeastLoaded", ll)
});
let exec = executor;
let fut = fut.and_then(move |_| {
let rr = lb::Balance::round_robin(gen_disco(exec.clone()));
run("RoundRobin", rr, &exec)
let rr = lb::Balance::round_robin(gen_disco());
run("RoundRobin", rr)
});
rt.spawn(fut);
rt.shutdown_on_idle().wait().unwrap();
}
fn gen_disco(executor: runtime::TaskExecutor) -> Disco {
fn gen_disco() -> Disco {
use self::Change::Insert;
let mut changes = VecDeque::new();
@ -91,14 +87,10 @@ fn gen_disco(executor: runtime::TaskExecutor) -> Disco {
changes.push_back(Insert(i, DelayService(*latency)));
}
Disco { changes, executor }
Disco { changes }
}
fn run<D, C>(
name: &'static str,
lb: lb::Balance<D, C>,
executor: &runtime::TaskExecutor,
) -> impl Future<Item = (), Error = ()>
fn run<D, C>(name: &'static str, lb: lb::Balance<D, C>) -> impl Future<Item = (), Error = ()>
where
D: Discover + Send + 'static,
D::Key: Send,
@ -110,7 +102,7 @@ where
println!("{}", name);
let t0 = Instant::now();
compute_histo(SendRequests::new(lb, REQUESTS, CONCURRENCY, executor))
compute_histo(SendRequests::new(lb, REQUESTS, CONCURRENCY))
.map(move |h| report(&h, t0.elapsed()))
.map_err(|_| {})
}
@ -169,7 +161,6 @@ struct Delay {
struct Disco {
changes: VecDeque<Change<usize, DelayService>>,
executor: runtime::TaskExecutor,
}
#[derive(Debug)]
@ -222,7 +213,7 @@ impl Discover for Disco {
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::Error> {
match self.changes.pop_front() {
Some(Change::Insert(k, svc)) => {
let svc = Buffer::new(svc, &self.executor).unwrap();
let svc = Buffer::new(svc).unwrap();
let svc = InFlightLimit::new(svc, ENDPOINT_CAPACITY);
Ok(Async::Ready(Change::Insert(k, svc)))
}
@ -255,15 +246,10 @@ where
<D::Service as Service<Req>>::Future: Send,
C: lb::Choose<D::Key, D::Service> + Send + 'static,
{
pub fn new(
lb: lb::Balance<D, C>,
total: usize,
concurrency: usize,
executor: &runtime::TaskExecutor,
) -> Self {
pub fn new(lb: lb::Balance<D, C>, total: usize, concurrency: usize) -> Self {
Self {
send_remaining: total,
lb: InFlightLimit::new(Buffer::new(lb, executor).ok().expect("buffer"), concurrency),
lb: InFlightLimit::new(Buffer::new(lb).ok().expect("buffer"), concurrency),
responses: stream::FuturesUnordered::new(),
}
}

View File

@ -8,6 +8,7 @@ publish = false
futures = "0.1"
tower-service = { version = "0.1", path = "../tower-service" }
tower-direct-service = { version = "0.1", path = "../tower-direct-service" }
tokio-executor = "0.1"
[dev-dependencies]
tower-mock = { version = "0.1", path = "../tower-mock" }

View File

@ -12,6 +12,7 @@
#[macro_use]
extern crate futures;
extern crate tower_service;
extern crate tokio_executor;
extern crate tower_direct_service;
use futures::future::Executor;
@ -24,6 +25,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{error, fmt};
use tower_service::Service;
use tokio_executor::DefaultExecutor;
use tower_direct_service::DirectService;
/// Adds a buffer in front of an inner service.
@ -113,17 +115,35 @@ where
}
}
/// Task that handles processing the buffer. This type should not be used
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
pub struct Worker<T, Request>
mod sealed {
use super::*;
/// Task that handles processing the buffer. This type should not be used
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
pub struct Worker<T, Request>
where
T: DirectService<Request>,
{
pub(crate) current_message: Option<Message<Request, T::Future>>,
pub(crate) rx: UnboundedReceiver<Message<Request, T::Future>>,
pub(crate) service: T,
pub(crate) finish: bool,
pub(crate) state: Arc<State>,
}
}
use sealed::Worker;
/// This trait allows you to use either Tokio's threaded runtime's executor or the `current_thread`
/// runtime's executor depending on if `T` is `Send` or `!Send`.
pub trait WorkerExecutor<T, Request>: Executor<sealed::Worker<T, Request>>
where
T: DirectService<Request>,
{
current_message: Option<Message<Request, T::Future>>,
rx: UnboundedReceiver<Message<Request, T::Future>>,
service: T,
finish: bool,
state: Arc<State>,
}
impl<T, Request, E: Executor<sealed::Worker<T, Request>>> WorkerExecutor<T, Request> for E where
T: DirectService<Request>
{
}
/// Error produced when spawning the worker fails
@ -153,14 +173,27 @@ impl<T, Request> Buffer<T, Request>
where
T: Service<Request>,
{
/// Creates a new `Buffer` wrapping `service`.
///
/// The default Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
pub fn new(service: T) -> Result<Self, SpawnError<T>>
where
T: Send + 'static,
T::Future: Send,
Request: Send + 'static,
{
Self::with_executor(service, &DefaultExecutor::current())
}
/// Creates a new `Buffer` wrapping `service`.
///
/// `executor` is used to spawn a new `Worker` task that is dedicated to
/// draining the buffer and dispatching the requests to the internal
/// service.
pub fn new<E>(service: T, executor: &E) -> Result<Self, SpawnError<T>>
pub fn with_executor<E>(service: T, executor: &E) -> Result<Self, SpawnError<T>>
where
E: Executor<Worker<DirectedService<T>, Request>>,
E: WorkerExecutor<DirectedService<T>, Request>,
{
let (tx, rx) = mpsc::unbounded();

View File

@ -58,14 +58,14 @@ fn clears_canceled_requests() {
type Mock = tower_mock::Mock<&'static str, &'static str, ()>;
type Handle = tower_mock::Handle<&'static str, &'static str, ()>;
type DirectedMock = tower_buffer::DirectedService<Mock>;
struct Exec;
impl futures::future::Executor<Worker<DirectedMock, &'static str>> for Exec {
fn execute(&self, fut: Worker<DirectedMock, &'static str>)
-> Result<(), futures::future::ExecuteError<Worker<DirectedMock, &'static str>>>
{
impl<F> futures::future::Executor<F> for Exec
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, fut: F) -> Result<(), futures::future::ExecuteError<F>> {
thread::spawn(move || {
fut.wait().unwrap();
});
@ -75,7 +75,7 @@ impl futures::future::Executor<Worker<DirectedMock, &'static str>> for Exec {
fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = Mock::new();
let service = Buffer::new(service, &Exec).unwrap();
let service = Buffer::with_executor(service, &Exec).unwrap();
(service, handle)
}