2022-07-17 15:41:18 -07:00
|
|
|
//! Batch worker item handling and run loop implementation.
|
|
|
|
|
2021-03-15 10:21:29 -07:00
|
|
|
use std::{
|
|
|
|
pin::Pin,
|
|
|
|
sync::{Arc, Mutex},
|
|
|
|
};
|
2020-11-19 11:54:20 -08:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
use futures::{
|
|
|
|
future::{BoxFuture, OptionFuture},
|
|
|
|
stream::FuturesUnordered,
|
|
|
|
FutureExt, StreamExt,
|
|
|
|
};
|
2020-06-12 10:22:08 -07:00
|
|
|
use pin_project::pin_project;
|
2020-06-12 11:42:28 -07:00
|
|
|
use tokio::{
|
|
|
|
sync::mpsc,
|
2020-11-19 11:54:20 -08:00
|
|
|
time::{sleep, Sleep},
|
2020-06-12 10:22:08 -07:00
|
|
|
};
|
2022-07-17 15:41:18 -07:00
|
|
|
use tokio_util::sync::PollSemaphore;
|
2020-06-12 11:42:28 -07:00
|
|
|
use tower::{Service, ServiceExt};
|
|
|
|
use tracing_futures::Instrument;
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2020-11-19 11:54:20 -08:00
|
|
|
use super::{
|
|
|
|
error::{Closed, ServiceError},
|
|
|
|
message::{self, Message},
|
|
|
|
BatchControl,
|
|
|
|
};
|
|
|
|
|
2020-06-12 10:22:08 -07:00
|
|
|
/// Task that handles processing the buffer. This type should not be used
|
|
|
|
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
|
|
|
|
///
|
|
|
|
/// The struct is `pub` in the private module and the type is *not* re-exported
|
|
|
|
/// as part of the public API. This is the "sealed" pattern to include "private"
|
|
|
|
/// types in public traits that are not meant for consumers of the library to
|
|
|
|
/// implement (only call).
|
2021-03-15 10:21:29 -07:00
|
|
|
#[pin_project(PinnedDrop)]
|
2020-06-12 10:22:08 -07:00
|
|
|
#[derive(Debug)]
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
pub struct Worker<T, Request>
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Service<BatchControl<Request>>,
|
2022-07-17 15:43:29 -07:00
|
|
|
T::Future: Send + 'static,
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T::Error: Into<crate::BoxError>,
|
2020-06-12 10:22:08 -07:00
|
|
|
{
|
2022-07-17 15:43:29 -07:00
|
|
|
// Batch management
|
|
|
|
//
|
2022-07-17 15:41:18 -07:00
|
|
|
/// A semaphore-bounded channel for receiving requests from the batch wrapper service.
|
2020-11-19 11:54:20 -08:00
|
|
|
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
|
2022-07-17 15:41:18 -07:00
|
|
|
|
|
|
|
/// The wrapped service that processes batches.
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
service: T,
|
2022-07-17 15:41:18 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// The number of pending items sent to `service`, since the last batch flush.
|
|
|
|
pending_items: usize,
|
|
|
|
|
|
|
|
/// The timer for the pending batch, if it has any items.
|
|
|
|
///
|
|
|
|
/// The timer is started when the first entry of a new batch is
|
|
|
|
/// submitted, so that the batch latency of all entries is at most
|
|
|
|
/// self.max_latency. However, we don't keep the timer running unless
|
|
|
|
/// there is a pending request to prevent wakeups on idle services.
|
|
|
|
pending_batch_timer: Option<Pin<Box<Sleep>>>,
|
|
|
|
|
|
|
|
/// The batches that the worker is concurrently executing.
|
|
|
|
concurrent_batches: FuturesUnordered<BoxFuture<'static, Result<T::Response, T::Error>>>,
|
|
|
|
|
|
|
|
// Errors and termination
|
|
|
|
//
|
2022-07-17 15:41:18 -07:00
|
|
|
/// An error that's populated on permanent service failure.
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
failed: Option<ServiceError>,
|
2022-07-17 15:41:18 -07:00
|
|
|
|
|
|
|
/// A shared error handle that's populated on permanent service failure.
|
|
|
|
error_handle: ErrorHandle,
|
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// A cloned copy of the wrapper service's semaphore, used to close the semaphore.
|
|
|
|
close: PollSemaphore,
|
|
|
|
|
|
|
|
// Config
|
|
|
|
//
|
2022-07-17 15:41:18 -07:00
|
|
|
/// The maximum number of items allowed in a batch.
|
2022-07-17 15:43:29 -07:00
|
|
|
max_items_in_batch: usize,
|
2022-07-17 15:41:18 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// The maximum number of batches that are allowed to run concurrently.
|
|
|
|
max_concurrent_batches: usize,
|
2022-07-17 15:41:18 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// The maximum delay before processing a batch with fewer than `max_items_in_batch`.
|
|
|
|
max_latency: std::time::Duration,
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Get the error out
|
|
|
|
#[derive(Debug)]
|
2022-07-17 15:41:18 -07:00
|
|
|
pub(crate) struct ErrorHandle {
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
inner: Arc<Mutex<Option<ServiceError>>>,
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
|
|
|
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
impl<T, Request> Worker<T, Request>
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Service<BatchControl<Request>>,
|
2022-07-17 15:43:29 -07:00
|
|
|
T::Future: Send + 'static,
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T::Error: Into<crate::BoxError>,
|
2020-06-12 10:22:08 -07:00
|
|
|
{
|
2022-07-17 15:43:29 -07:00
|
|
|
/// Creates a new batch worker.
|
|
|
|
///
|
|
|
|
/// See [`Service::new()`](crate::Service::new) for details.
|
2020-06-12 10:22:08 -07:00
|
|
|
pub(crate) fn new(
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
service: T,
|
2020-11-19 11:54:20 -08:00
|
|
|
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
|
2022-07-17 15:43:29 -07:00
|
|
|
max_items_in_batch: usize,
|
|
|
|
max_concurrent_batches: usize,
|
2020-06-12 11:42:28 -07:00
|
|
|
max_latency: std::time::Duration,
|
2022-07-17 15:41:18 -07:00
|
|
|
close: PollSemaphore,
|
|
|
|
) -> (ErrorHandle, Worker<T, Request>) {
|
|
|
|
let error_handle = ErrorHandle {
|
2020-06-12 10:22:08 -07:00
|
|
|
inner: Arc::new(Mutex::new(None)),
|
|
|
|
};
|
|
|
|
|
|
|
|
let worker = Worker {
|
|
|
|
rx,
|
|
|
|
service,
|
2022-07-17 15:43:29 -07:00
|
|
|
pending_items: 0,
|
|
|
|
pending_batch_timer: None,
|
|
|
|
concurrent_batches: FuturesUnordered::new(),
|
2020-06-12 11:42:28 -07:00
|
|
|
failed: None,
|
2022-07-17 15:43:29 -07:00
|
|
|
error_handle: error_handle.clone(),
|
2022-07-17 15:41:18 -07:00
|
|
|
close,
|
2022-07-17 15:43:29 -07:00
|
|
|
max_items_in_batch,
|
|
|
|
max_concurrent_batches,
|
|
|
|
max_latency,
|
2020-06-12 10:22:08 -07:00
|
|
|
};
|
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
(error_handle, worker)
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// Process a single worker request.
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
async fn process_req(&mut self, req: Request, tx: message::Tx<T::Future>) {
|
2022-07-17 15:43:29 -07:00
|
|
|
if let Some(ref error) = self.failed {
|
|
|
|
tracing::trace!(
|
|
|
|
?error,
|
|
|
|
"notifying batch request caller about worker failure",
|
|
|
|
);
|
|
|
|
let _ = tx.send(Err(error.clone()));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
match self.service.ready().await {
|
|
|
|
Ok(svc) => {
|
|
|
|
let rsp = svc.call(req.into());
|
|
|
|
let _ = tx.send(Ok(rsp));
|
|
|
|
|
|
|
|
self.pending_items += 1;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
self.failed(e.into());
|
|
|
|
let _ = tx.send(Err(self
|
|
|
|
.failed
|
|
|
|
.as_ref()
|
|
|
|
.expect("Worker::failed did not set self.failed?")
|
|
|
|
.clone()));
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
2020-06-12 11:42:28 -07:00
|
|
|
}
|
|
|
|
}
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// Tell the inner service to flush the current batch.
|
|
|
|
///
|
|
|
|
/// Waits until the inner service is ready,
|
|
|
|
/// then stores a future which resolves when the batch finishes.
|
2020-06-12 11:42:28 -07:00
|
|
|
async fn flush_service(&mut self) {
|
2022-07-17 15:43:29 -07:00
|
|
|
if self.failed.is_some() {
|
|
|
|
tracing::trace!("worker failure: skipping flush");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
match self.service.ready().await {
|
|
|
|
Ok(ready_service) => {
|
|
|
|
let flush_future = ready_service.call(BatchControl::Flush);
|
|
|
|
self.concurrent_batches.push(flush_future.boxed());
|
|
|
|
|
|
|
|
// Now we have an empty batch.
|
|
|
|
self.pending_items = 0;
|
|
|
|
self.pending_batch_timer = None;
|
|
|
|
}
|
|
|
|
Err(error) => {
|
|
|
|
self.failed(error.into());
|
|
|
|
}
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
2022-07-17 15:43:29 -07:00
|
|
|
}
|
2022-07-17 15:41:18 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// Is the current number of concurrent batches above the configured limit?
|
|
|
|
fn can_spawn_new_batches(&self) -> bool {
|
|
|
|
self.concurrent_batches.len() < self.max_concurrent_batches
|
2020-06-12 11:42:28 -07:00
|
|
|
}
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// Run loop for batch requests, which implements the batch policies.
|
|
|
|
///
|
|
|
|
/// See [`Service::new()`](crate::Service::new) for details.
|
2020-06-12 11:42:28 -07:00
|
|
|
pub async fn run(mut self) {
|
|
|
|
loop {
|
2022-07-17 15:43:29 -07:00
|
|
|
// Wait on either a new message or the batch timer.
|
|
|
|
//
|
|
|
|
// If both are ready, end the batch now, because the timer has elapsed.
|
|
|
|
// If the timer elapses, any pending messages are preserved:
|
|
|
|
// https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety
|
|
|
|
tokio::select! {
|
|
|
|
biased;
|
|
|
|
|
|
|
|
batch_result = self.concurrent_batches.next(), if !self.concurrent_batches.is_empty() => match batch_result.expect("only returns None when empty") {
|
|
|
|
Ok(_response) => {
|
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
"batch finished executing",
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Err(error) => {
|
|
|
|
let error = error.into();
|
|
|
|
tracing::trace!(?error, "batch execution failed");
|
|
|
|
self.failed(error);
|
|
|
|
}
|
|
|
|
},
|
|
|
|
|
|
|
|
Some(()) = OptionFuture::from(self.pending_batch_timer.as_mut()), if self.pending_batch_timer.as_ref().is_some() => {
|
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
"batch timer expired",
|
|
|
|
);
|
|
|
|
|
|
|
|
// TODO: use a batch-specific span to instrument this future.
|
|
|
|
self.flush_service().await;
|
|
|
|
},
|
|
|
|
|
|
|
|
maybe_msg = self.rx.recv(), if self.can_spawn_new_batches() => match maybe_msg {
|
2020-06-12 11:42:28 -07:00
|
|
|
Some(msg) => {
|
2022-07-17 15:43:29 -07:00
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
"batch message received",
|
|
|
|
);
|
|
|
|
|
2020-06-12 11:42:28 -07:00
|
|
|
let span = msg.span;
|
2022-07-17 15:43:29 -07:00
|
|
|
|
2020-06-12 11:42:28 -07:00
|
|
|
self.process_req(msg.request, msg.tx)
|
2022-07-17 15:43:29 -07:00
|
|
|
// Apply the provided span to request processing.
|
2020-06-12 11:42:28 -07:00
|
|
|
.instrument(span)
|
|
|
|
.await;
|
2022-07-17 15:43:29 -07:00
|
|
|
|
|
|
|
// Check whether we have too many pending items.
|
|
|
|
if self.pending_items >= self.max_items_in_batch {
|
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
"batch is full",
|
|
|
|
);
|
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
// TODO: use a batch-specific span to instrument this future.
|
|
|
|
self.flush_service().await;
|
2022-07-17 15:43:29 -07:00
|
|
|
} else if self.pending_items == 1 {
|
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
"batch is new, starting timer",
|
|
|
|
);
|
|
|
|
|
|
|
|
// The first message in a new batch.
|
|
|
|
self.pending_batch_timer = Some(Box::pin(sleep(self.max_latency)));
|
|
|
|
} else {
|
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
"waiting for full batch or batch timer",
|
|
|
|
);
|
2022-07-17 15:41:18 -07:00
|
|
|
}
|
2022-07-17 15:43:29 -07:00
|
|
|
}
|
|
|
|
None => {
|
|
|
|
tracing::trace!("batch channel closed and emptied, exiting worker task");
|
2022-07-17 15:41:18 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
return;
|
2020-06-12 11:42:28 -07:00
|
|
|
}
|
2022-07-17 15:43:29 -07:00
|
|
|
},
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
/// Register an inner service failure.
|
|
|
|
///
|
|
|
|
/// The underlying service failed when we called `poll_ready` on it with the given `error`. We
|
|
|
|
/// need to communicate this to all the `Buffer` handles. To do so, we wrap up the error in
|
|
|
|
/// an `Arc`, send that `Arc<E>` to all pending requests, and store it so that subsequent
|
|
|
|
/// requests will also fail with the same error.
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
fn failed(&mut self, error: crate::BoxError) {
|
2022-07-17 15:43:29 -07:00
|
|
|
tracing::debug!(?error, "batch worker error");
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
// Note that we need to handle the case where some error_handle is concurrently trying to send us
|
2020-06-12 10:22:08 -07:00
|
|
|
// a request. We need to make sure that *either* the send of the request fails *or* it
|
|
|
|
// receives an error on the `oneshot` it constructed. Specifically, we want to avoid the
|
|
|
|
// case where we send errors to all outstanding requests, and *then* the caller sends its
|
|
|
|
// request. We do this by *first* exposing the error, *then* closing the channel used to
|
|
|
|
// send more requests (so the client will see the error when the send fails), and *then*
|
|
|
|
// sending the error to all outstanding requests.
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
let error = ServiceError::new(error);
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
let mut inner = self.error_handle.inner.lock().unwrap();
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
// Ignore duplicate failures
|
2020-06-12 10:22:08 -07:00
|
|
|
if inner.is_some() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2020-06-17 17:00:19 -07:00
|
|
|
*inner = Some(error.clone());
|
2020-06-12 10:22:08 -07:00
|
|
|
drop(inner);
|
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
tracing::trace!(
|
|
|
|
?error,
|
|
|
|
"worker failure: waking pending requests so they can be failed",
|
|
|
|
);
|
2020-06-12 10:22:08 -07:00
|
|
|
self.rx.close();
|
2022-07-17 15:43:29 -07:00
|
|
|
self.close.close();
|
|
|
|
|
|
|
|
// We don't schedule any batches on an errored service
|
|
|
|
self.pending_batch_timer = None;
|
2020-06-12 10:22:08 -07:00
|
|
|
|
2020-06-12 11:42:28 -07:00
|
|
|
// By closing the mpsc::Receiver, we know that that the run() loop will
|
|
|
|
// drain all pending requests. We just need to make sure that any
|
|
|
|
// requests that we receive before we've exhausted the receiver receive
|
|
|
|
// the error:
|
2020-06-12 10:22:08 -07:00
|
|
|
self.failed = Some(error);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
impl ErrorHandle {
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
pub(crate) fn get_error_on_closed(&self) -> crate::BoxError {
|
2020-06-12 10:22:08 -07:00
|
|
|
self.inner
|
|
|
|
.lock()
|
2022-07-17 15:41:18 -07:00
|
|
|
.expect("previous task panicked while holding the error handle mutex")
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
.as_ref()
|
|
|
|
.map(|svc_err| svc_err.clone().into())
|
2020-06-12 10:22:08 -07:00
|
|
|
.unwrap_or_else(|| Closed::new().into())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
impl Clone for ErrorHandle {
|
|
|
|
fn clone(&self) -> ErrorHandle {
|
|
|
|
ErrorHandle {
|
2020-06-12 10:22:08 -07:00
|
|
|
inner: self.inner.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-03-15 10:21:29 -07:00
|
|
|
|
|
|
|
#[pin_project::pinned_drop]
|
|
|
|
impl<T, Request> PinnedDrop for Worker<T, Request>
|
|
|
|
where
|
|
|
|
T: Service<BatchControl<Request>>,
|
2022-07-17 15:43:29 -07:00
|
|
|
T::Future: Send + 'static,
|
2021-03-15 10:21:29 -07:00
|
|
|
T::Error: Into<crate::BoxError>,
|
|
|
|
{
|
|
|
|
fn drop(mut self: Pin<&mut Self>) {
|
2022-07-17 15:43:29 -07:00
|
|
|
tracing::trace!(
|
|
|
|
pending_items = self.pending_items,
|
|
|
|
batch_deadline = ?self.pending_batch_timer.as_ref().map(|sleep| sleep.deadline()),
|
|
|
|
running_batches = self.concurrent_batches.len(),
|
|
|
|
error = ?self.failed,
|
|
|
|
"dropping batch worker",
|
|
|
|
);
|
|
|
|
|
2022-07-17 15:41:18 -07:00
|
|
|
// Fail pending tasks
|
|
|
|
self.failed(Closed::new().into());
|
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
// Fail queued requests
|
|
|
|
while let Ok(msg) = self.rx.try_recv() {
|
|
|
|
let _ = msg
|
|
|
|
.tx
|
|
|
|
.send(Err(self.failed.as_ref().expect("just set failed").clone()));
|
|
|
|
}
|
2022-07-17 15:41:18 -07:00
|
|
|
|
2022-07-17 15:43:29 -07:00
|
|
|
// Clear any finished batches, ignoring any errors.
|
|
|
|
// Ignore any batches that are still executing, because we can't cancel them.
|
|
|
|
//
|
|
|
|
// now_or_never() can stop futures waking up, but that's ok here,
|
|
|
|
// because we're manually polling, then dropping the stream.
|
|
|
|
while let Some(Some(_)) = self
|
|
|
|
.as_mut()
|
|
|
|
.project()
|
|
|
|
.concurrent_batches
|
|
|
|
.next()
|
|
|
|
.now_or_never()
|
|
|
|
{}
|
2021-03-15 10:21:29 -07:00
|
|
|
}
|
|
|
|
}
|