split pair constructor off of Batch::new

This commit is contained in:
Jane Lusby 2021-03-15 09:27:26 -07:00 committed by teor
parent 9e1662d2d7
commit c10ea1d82b
1 changed files with 28 additions and 5 deletions

View File

@ -73,21 +73,44 @@ where
T::Error: Send + Sync, T::Error: Send + Sync,
Request: Send + 'static, Request: Send + 'static,
{ {
let (batch, worker) = Self::pair(service, max_items, max_latency);
tokio::spawn(worker.run());
batch
}
/// Creates a new `Batch` wrapping `service`, but returns the background worker.
///
/// This is useful if you do not want to spawn directly onto the `tokio`
/// runtime but instead want to use your own executor. This will return the
/// `Batch` and the background `Worker` that you can then spawn.
pub fn pair(
service: T,
max_items: usize,
max_latency: std::time::Duration,
) -> (Self, Worker<T, Request>)
where
T: Send + 'static,
T::Error: Send + Sync,
Request: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
// The semaphore bound limits the maximum number of concurrent requests // The semaphore bound limits the maximum number of concurrent requests
// (specifically, requests which got a `Ready` from `poll_ready`, but haven't // (specifically, requests which got a `Ready` from `poll_ready`, but haven't
// used their semaphore reservation in a `call` yet). // used their semaphore reservation in a `call` yet).
// We choose a bound that allows callers to check readiness for every item in // We choose a bound that allows callers to check readiness for every item in
// a batch, then actually submit those items. // a batch, then actually submit those items.
let bound = max_items; let bound = max_items;
let (tx, rx) = mpsc::unbounded_channel();
let (handle, worker) = Worker::new(service, rx, max_items, max_latency);
tokio::spawn(worker.run());
let semaphore = Semaphore::new(bound); let semaphore = Semaphore::new(bound);
Batch {
let (handle, worker) = Worker::new(service, rx, max_items, max_latency);
let batch = Batch {
tx, tx,
semaphore, semaphore,
handle, handle,
} };
(batch, worker)
} }
fn get_worker_error(&self) -> crate::BoxError { fn get_worker_error(&self) -> crate::BoxError {