diff --git a/tower-batch/src/service.rs b/tower-batch/src/service.rs index bbe1e582c..176d63fa3 100644 --- a/tower-batch/src/service.rs +++ b/tower-batch/src/service.rs @@ -73,21 +73,44 @@ where T::Error: Send + Sync, Request: Send + 'static, { + let (batch, worker) = Self::pair(service, max_items, max_latency); + tokio::spawn(worker.run()); + batch + } + + /// Creates a new `Batch` wrapping `service`, but returns the background worker. + /// + /// This is useful if you do not want to spawn directly onto the `tokio` + /// runtime but instead want to use your own executor. This will return the + /// `Batch` and the background `Worker` that you can then spawn. + pub fn pair( + service: T, + max_items: usize, + max_latency: std::time::Duration, + ) -> (Self, Worker) + where + T: Send + 'static, + T::Error: Send + Sync, + Request: Send + 'static, + { + let (tx, rx) = mpsc::unbounded_channel(); + // The semaphore bound limits the maximum number of concurrent requests // (specifically, requests which got a `Ready` from `poll_ready`, but haven't // used their semaphore reservation in a `call` yet). // We choose a bound that allows callers to check readiness for every item in // a batch, then actually submit those items. let bound = max_items; - let (tx, rx) = mpsc::unbounded_channel(); - let (handle, worker) = Worker::new(service, rx, max_items, max_latency); - tokio::spawn(worker.run()); let semaphore = Semaphore::new(bound); - Batch { + + let (handle, worker) = Worker::new(service, rx, max_items, max_latency); + let batch = Batch { tx, semaphore, handle, - } + }; + + (batch, worker) } fn get_worker_error(&self) -> crate::BoxError {