diff --git a/tower/src/buffer/layer.rs b/tower/src/buffer/layer.rs index 01e7100..b744cb1 100644 --- a/tower/src/buffer/layer.rs +++ b/tower/src/buffer/layer.rs @@ -33,7 +33,6 @@ impl BufferLayer { BufferLayer { bound, _p: PhantomData, - _e: PhantomData, } } } @@ -51,7 +50,7 @@ where type Service = Buffer; fn layer(&self, service: S) -> Self::Service { - Buffer::new(service, self.bound) + Buffer::new2(service, self.bound) } } diff --git a/tower/src/buffer/service.rs b/tower/src/buffer/service.rs index 06d4c55..1ab4d22 100644 --- a/tower/src/buffer/service.rs +++ b/tower/src/buffer/service.rs @@ -21,6 +21,43 @@ where handle: Handle, } +impl Buffer +where + S: Service, + S::Error: Into + Clone, + crate::buffer::error::Closed: Into, +{ + /// Creates a new `Buffer` wrapping `service`. + /// + /// `bound` gives the maximal number of requests that can be queued for the service before + /// backpressure is applied to callers. + /// + /// The default Tokio executor is used to run the given service, which means that this method + /// must be called while on the Tokio runtime. + /// + /// # A note on choosing a `bound` + /// + /// When `Buffer`'s implementation of `poll_ready` returns `Poll::Ready`, it reserves a + /// slot in the channel for the forthcoming `call()`. However, if this call doesn't arrive, + /// this reserved slot may be held up for a long time. As a result, it's advisable to set + /// `bound` to be at least the maximum number of concurrent requests the `Buffer` will see. + /// If you do not, all the slots in the buffer may be held up by futures that have just called + /// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new + /// requests. + pub fn new(service: S, bound: usize) -> Self + where + S: Send + 'static, + S::Future: Send, + S::Error: Send + Sync + std::fmt::Display, + Request: Send + 'static, + { + let (tx, rx) = mpsc::channel(bound); + let (handle, worker) = Worker::new(service, rx); + tokio::spawn(worker); + Buffer { tx, handle } + } +} + impl Buffer where S: Service, @@ -45,7 +82,7 @@ where /// If you do not, all the slots in the buffer may be held up by futures that have just called /// `poll_ready` but will not issue a `call`, which prevents other senders from issuing new /// requests. - pub fn new(service: S, bound: usize) -> Self + pub fn new2(service: S, bound: usize) -> Self where S: Send + 'static, S::Future: Send,