diff --git a/tower-buffer/src/layer.rs b/tower-buffer/src/layer.rs new file mode 100644 index 0000000..8b82145 --- /dev/null +++ b/tower-buffer/src/layer.rs @@ -0,0 +1,46 @@ +use crate::{error::Error, service::Buffer, worker::WorkerExecutor}; +use tokio_executor::DefaultExecutor; +use tower_layer::Layer; +use tower_service::Service; + +/// Buffer requests with a bounded buffer +pub struct BufferLayer { + bound: usize, + executor: E, +} + +impl BufferLayer { + pub fn new(bound: usize) -> Self { + BufferLayer { + bound, + executor: DefaultExecutor::current(), + } + } +} + +impl BufferLayer { + pub fn with_executor(bound: usize, executor: E) -> Self + where + S: Service, + S::Error: Into, + E: WorkerExecutor + Clone, + { + BufferLayer { bound, executor } + } +} + +impl Layer for BufferLayer +where + S: Service, + S::Error: Into, + E: WorkerExecutor + Clone, +{ + type Response = S::Response; + type Error = Error; + type LayerError = Error; + type Service = Buffer; + + fn layer(&self, service: S) -> Result { + Buffer::with_executor(service, self.bound, &mut self.executor.clone()) + } +} diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index 30165b3..214f2a8 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -10,163 +10,11 @@ pub mod error; pub mod future; +mod layer; mod message; +mod service; mod worker; +pub use crate::layer::BufferLayer; +pub use crate::service::Buffer; pub use crate::worker::WorkerExecutor; - -use crate::{error::Error, future::ResponseFuture, message::Message, worker::Worker}; - -use futures::Poll; -use tokio_executor::DefaultExecutor; -use tokio_sync::{mpsc, oneshot}; -use tower_layer::Layer; -use tower_service::Service; - -/// Adds a buffer in front of an inner service. -/// -/// See crate level documentation for more details. -pub struct Buffer -where - T: Service, -{ - tx: mpsc::Sender>, - worker: worker::Handle, -} - -/// Buffer requests with a bounded buffer -pub struct BufferLayer { - bound: usize, - executor: E, -} - -impl BufferLayer { - pub fn new(bound: usize) -> Self { - BufferLayer { - bound, - executor: DefaultExecutor::current(), - } - } -} - -impl BufferLayer { - pub fn with_executor(bound: usize, executor: E) -> Self - where - S: Service, - S::Error: Into, - E: WorkerExecutor + Clone, - { - BufferLayer { bound, executor } - } -} - -impl Layer for BufferLayer -where - S: Service, - S::Error: Into, - E: WorkerExecutor + Clone, -{ - type Response = S::Response; - type Error = Error; - type LayerError = Error; - type Service = Buffer; - - fn layer(&self, service: S) -> Result { - Buffer::with_executor(service, self.bound, &mut self.executor.clone()) - } -} - -impl Buffer -where - T: Service, - T::Error: Into, -{ - /// Creates a new `Buffer` wrapping `service`. - /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. - /// - /// The default Tokio executor is used to run the given service, which means that this method - /// must be called while on the Tokio runtime. - pub fn new(service: T, bound: usize) -> Result - where - T: Send + 'static, - T::Future: Send, - T::Error: Send + Sync, - Request: Send + 'static, - { - Self::with_executor(service, bound, &mut DefaultExecutor::current()) - } - - /// Creates a new `Buffer` wrapping `service`. - /// - /// `executor` is used to spawn a new `Worker` task that is dedicated to - /// draining the buffer and dispatching the requests to the internal - /// service. - /// - /// `bound` gives the maximal number of requests that can be queued for the service before - /// backpressure is applied to callers. - pub fn with_executor(service: T, bound: usize, executor: &mut E) -> Result - where - E: WorkerExecutor, - { - let (tx, rx) = mpsc::channel(bound); - - Worker::spawn(service, rx, executor).map(|worker| Buffer { tx, worker }) - } -} - -impl Service for Buffer -where - T: Service, - T::Error: Into, -{ - type Response = T::Response; - type Error = Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - // If the inner service has errored, then we error here. - self.tx - .poll_ready() - .map_err(|_| self.worker.get_error_on_closed()) - } - - fn call(&mut self, request: Request) -> Self::Future { - // TODO: - // ideally we'd poll_ready again here so we don't allocate the oneshot - // if the try_send is about to fail, but sadly we can't call poll_ready - // outside of task context. - let (tx, rx) = oneshot::channel(); - - match self.tx.try_send(Message { request, tx }) { - Err(e) => { - if e.is_closed() { - ResponseFuture::failed(self.worker.get_error_on_closed()) - } else { - // When `mpsc::Sender::poll_ready` returns `Ready`, a slot - // in the channel is reserved for the handle. Other `Sender` - // handles may not send a message using that slot. This - // guarantees capacity for `request`. - // - // Given this, the only way to hit this code path is if - // `poll_ready` has not been called & `Ready` returned. - panic!("buffer full; poll_ready must be called first"); - } - } - Ok(_) => ResponseFuture::new(rx), - } - } -} - -impl Clone for Buffer -where - T: Service, -{ - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - worker: self.worker.clone(), - } - } -} diff --git a/tower-buffer/src/service.rs b/tower-buffer/src/service.rs new file mode 100644 index 0000000..82f804a --- /dev/null +++ b/tower-buffer/src/service.rs @@ -0,0 +1,117 @@ +use crate::{ + error::Error, + future::ResponseFuture, + message::Message, + worker::{Handle, Worker, WorkerExecutor}, +}; + +use futures::Poll; +use tokio_executor::DefaultExecutor; +use tokio_sync::{mpsc, oneshot}; +use tower_service::Service; + +/// Adds a buffer in front of an inner service. +/// +/// See crate level documentation for more details. +pub struct Buffer +where + T: Service, +{ + tx: mpsc::Sender>, + worker: Handle, +} + +impl Buffer +where + T: Service, + T::Error: Into, +{ + /// Creates a new `Buffer` wrapping `service`. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + /// + /// The default Tokio executor is used to run the given service, which means that this method + /// must be called while on the Tokio runtime. + pub fn new(service: T, bound: usize) -> Result + where + T: Send + 'static, + T::Future: Send, + T::Error: Send + Sync, + Request: Send + 'static, + { + Self::with_executor(service, bound, &mut DefaultExecutor::current()) + } + + /// Creates a new `Buffer` wrapping `service`. + /// + /// `executor` is used to spawn a new `Worker` task that is dedicated to + /// draining the buffer and dispatching the requests to the internal + /// service. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + pub fn with_executor(service: T, bound: usize, executor: &mut E) -> Result + where + E: WorkerExecutor, + { + let (tx, rx) = mpsc::channel(bound); + + Worker::spawn(service, rx, executor).map(|worker| Buffer { tx, worker }) + } +} + +impl Service for Buffer +where + T: Service, + T::Error: Into, +{ + type Response = T::Response; + type Error = Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + // If the inner service has errored, then we error here. + self.tx + .poll_ready() + .map_err(|_| self.worker.get_error_on_closed()) + } + + fn call(&mut self, request: Request) -> Self::Future { + // TODO: + // ideally we'd poll_ready again here so we don't allocate the oneshot + // if the try_send is about to fail, but sadly we can't call poll_ready + // outside of task context. + let (tx, rx) = oneshot::channel(); + + match self.tx.try_send(Message { request, tx }) { + Err(e) => { + if e.is_closed() { + ResponseFuture::failed(self.worker.get_error_on_closed()) + } else { + // When `mpsc::Sender::poll_ready` returns `Ready`, a slot + // in the channel is reserved for the handle. Other `Sender` + // handles may not send a message using that slot. This + // guarantees capacity for `request`. + // + // Given this, the only way to hit this code path is if + // `poll_ready` has not been called & `Ready` returned. + panic!("buffer full; poll_ready must be called first"); + } + } + Ok(_) => ResponseFuture::new(rx), + } + } +} + +impl Clone for Buffer +where + T: Service, +{ + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + worker: self.worker.clone(), + } + } +}