buffer: switch to TypedExecutor (#205)

This commit is contained in:
Carl Lerche 2019-03-25 10:56:12 -07:00 committed by GitHub
parent 0e70f1320e
commit bec3937e87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 22 deletions

View File

@ -5,11 +5,11 @@ authors = ["Carl Lerche <me@carllerche.com>"]
publish = false publish = false
[dependencies] [dependencies]
futures = "0.1" futures = "0.1.25"
tower-service = "0.2.0" tower-service = "0.2.0"
tower-layer = { version = "0.1", path = "../tower-layer" } tower-layer = { version = "0.1", path = "../tower-layer" }
tokio-executor = "0.1" tokio-executor = "0.1.7"
tokio-sync = "0.1" tokio-sync = "0.1.0"
[dev-dependencies] [dev-dependencies]
tower-mock = { version = "0.1", path = "../tower-mock" } tower-mock = { version = "0.1", path = "../tower-mock" }

View File

@ -62,7 +62,7 @@ impl<E> BufferLayer<E> {
where where
S: Service<Request>, S: Service<Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
E: WorkerExecutor<S, Request>, E: WorkerExecutor<S, Request> + Clone,
{ {
BufferLayer { bound, executor } BufferLayer { bound, executor }
} }
@ -72,7 +72,7 @@ impl<E, S, Request> Layer<S, Request> for BufferLayer<E>
where where
S: Service<Request>, S: Service<Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
E: WorkerExecutor<S, Request>, E: WorkerExecutor<S, Request> + Clone,
{ {
type Response = S::Response; type Response = S::Response;
type Error = Error; type Error = Error;
@ -80,7 +80,7 @@ where
type Service = Buffer<S, Request>; type Service = Buffer<S, Request>;
fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> { fn layer(&self, service: S) -> Result<Self::Service, Self::LayerError> {
Buffer::with_executor(service, self.bound, &self.executor) Buffer::with_executor(service, self.bound, &mut self.executor.clone())
} }
} }
@ -103,7 +103,7 @@ where
T::Error: Send + Sync, T::Error: Send + Sync,
Request: Send + 'static, Request: Send + 'static,
{ {
Self::with_executor(service, bound, &DefaultExecutor::current()) Self::with_executor(service, bound, &mut DefaultExecutor::current())
} }
/// Creates a new `Buffer` wrapping `service`. /// Creates a new `Buffer` wrapping `service`.
@ -114,7 +114,7 @@ where
/// ///
/// `bound` gives the maximal number of requests that can be queued for the service before /// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers. /// backpressure is applied to callers.
pub fn with_executor<E>(service: T, bound: usize, executor: &E) -> Result<Self, Error> pub fn with_executor<E>(service: T, bound: usize, executor: &mut E) -> Result<Self, Error>
where where
E: WorkerExecutor<T, Request>, E: WorkerExecutor<T, Request>,
{ {

View File

@ -1,8 +1,8 @@
use error::{Closed, Error, ServiceError, SpawnError}; use error::{Closed, Error, ServiceError, SpawnError};
use futures::future::Executor;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use message::Message; use message::Message;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio_executor::TypedExecutor;
use tokio_sync::mpsc; use tokio_sync::mpsc;
use tower_service::Service; use tower_service::Service;
@ -33,14 +33,14 @@ pub(crate) struct Handle {
/// This trait allows you to use either Tokio's threaded runtime's executor or the `current_thread` /// This trait allows you to use either Tokio's threaded runtime's executor or the `current_thread`
/// runtime's executor depending on if `T` is `Send` or `!Send`. /// runtime's executor depending on if `T` is `Send` or `!Send`.
pub trait WorkerExecutor<T, Request>: Executor<Worker<T, Request>> pub trait WorkerExecutor<T, Request>: TypedExecutor<Worker<T, Request>>
where where
T: Service<Request>, T: Service<Request>,
T::Error: Into<Error>, T::Error: Into<Error>,
{ {
} }
impl<T, Request, E: Executor<Worker<T, Request>>> WorkerExecutor<T, Request> for E impl<T, Request, E: TypedExecutor<Worker<T, Request>>> WorkerExecutor<T, Request> for E
where where
T: Service<Request>, T: Service<Request>,
T::Error: Into<Error>, T::Error: Into<Error>,
@ -55,7 +55,7 @@ where
pub(crate) fn spawn<E>( pub(crate) fn spawn<E>(
service: T, service: T,
rx: mpsc::Receiver<Message<Request, T::Future>>, rx: mpsc::Receiver<Message<Request, T::Future>>,
executor: &E, executor: &mut E,
) -> Result<Handle, Error> ) -> Result<Handle, Error>
where where
E: WorkerExecutor<T, Request>, E: WorkerExecutor<T, Request>,
@ -73,7 +73,7 @@ where
handle: handle.clone(), handle: handle.clone(),
}; };
match executor.execute(worker) { match executor.spawn(worker) {
Ok(()) => Ok(handle), Ok(()) => Ok(handle),
Err(_) => Err(SpawnError::new().into()), Err(_) => Err(SpawnError::new().into()),
} }

View File

@ -1,9 +1,11 @@
extern crate futures; extern crate futures;
extern crate tokio_executor;
extern crate tower_buffer; extern crate tower_buffer;
extern crate tower_mock; extern crate tower_mock;
extern crate tower_service; extern crate tower_service;
use futures::prelude::*; use futures::prelude::*;
use tokio_executor::{SpawnError, TypedExecutor};
use tower_buffer::*; use tower_buffer::*;
use tower_service::*; use tower_service::*;
@ -113,9 +115,9 @@ fn poll_ready_when_worker_is_dropped_early() {
let (service, _handle) = Mock::new(); let (service, _handle) = Mock::new();
// drop that worker right on the floor! // drop that worker right on the floor!
let exec = ExecFn(drop); let mut exec = ExecFn(drop);
let mut service = Buffer::with_executor(service, 1, &exec).unwrap(); let mut service = Buffer::with_executor(service, 1, &mut exec).unwrap();
with_task(|| { with_task(|| {
service service
@ -130,9 +132,9 @@ fn response_future_when_worker_is_dropped_early() {
// hold the worker in a cell until we want to drop it later // hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None); let cell = RefCell::new(None);
let exec = ExecFn(|fut| *cell.borrow_mut() = Some(fut)); let mut exec = ExecFn(|fut| *cell.borrow_mut() = Some(fut));
let mut service = Buffer::with_executor(service, 1, &exec).unwrap(); let mut service = Buffer::with_executor(service, 1, &mut exec).unwrap();
// keep the request in the worker // keep the request in the worker
handle.allow(0); handle.allow(0);
@ -149,11 +151,11 @@ type Handle = tower_mock::Handle<&'static str, &'static str>;
struct Exec; struct Exec;
impl<F> futures::future::Executor<F> for Exec impl<F> TypedExecutor<F> for Exec
where where
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Item = (), Error = ()> + Send + 'static,
{ {
fn execute(&self, fut: F) -> Result<(), futures::future::ExecuteError<F>> { fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
thread::spawn(move || { thread::spawn(move || {
fut.wait().unwrap(); fut.wait().unwrap();
}); });
@ -163,12 +165,12 @@ where
struct ExecFn<Func>(Func); struct ExecFn<Func>(Func);
impl<Func, F> futures::future::Executor<F> for ExecFn<Func> impl<Func, F> TypedExecutor<F> for ExecFn<Func>
where where
Func: Fn(F), Func: Fn(F),
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Item = (), Error = ()> + Send + 'static,
{ {
fn execute(&self, fut: F) -> Result<(), futures::future::ExecuteError<F>> { fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
(self.0)(fut); (self.0)(fut);
Ok(()) Ok(())
} }
@ -177,7 +179,7 @@ where
fn new_service() -> (Buffer<Mock, &'static str>, Handle) { fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = Mock::new(); let (service, handle) = Mock::new();
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests // bound is >0 here because clears_canceled_requests needs multiple outstanding requests
let service = Buffer::with_executor(service, 10, &Exec).unwrap(); let service = Buffer::with_executor(service, 10, &mut Exec).unwrap();
(service, handle) (service, handle)
} }