2020-06-12 10:22:08 -07:00
|
|
|
use super::{
|
|
|
|
future::ResponseFuture,
|
|
|
|
message::Message,
|
|
|
|
worker::{Handle, Worker},
|
2020-06-12 11:42:28 -07:00
|
|
|
BatchControl,
|
2020-06-12 10:22:08 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
use futures_core::ready;
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
use std::task::{Context, Poll};
|
2020-06-12 10:22:08 -07:00
|
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
use tower::Service;
|
|
|
|
|
2020-06-12 11:42:28 -07:00
|
|
|
/// Allows batch processing of requests.
|
2020-06-12 10:22:08 -07:00
|
|
|
///
|
|
|
|
/// See the module documentation for more details.
|
|
|
|
#[derive(Debug)]
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
pub struct Batch<T, Request>
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Service<BatchControl<Request>>,
|
2020-06-12 10:22:08 -07:00
|
|
|
{
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
tx: mpsc::Sender<Message<Request, T::Future>>,
|
|
|
|
handle: Handle,
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
|
|
|
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
impl<T, Request> Batch<T, Request>
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Service<BatchControl<Request>>,
|
|
|
|
T::Error: Into<crate::BoxError>,
|
2020-06-12 10:22:08 -07:00
|
|
|
{
|
2020-06-12 11:42:28 -07:00
|
|
|
/// Creates a new `Batch` wrapping `service`.
|
2020-06-12 10:22:08 -07:00
|
|
|
///
|
2020-06-12 11:42:28 -07:00
|
|
|
/// The wrapper is responsible for telling the inner service when to flush a
|
|
|
|
/// batch of requests. Two parameters control this policy:
|
2020-06-12 10:22:08 -07:00
|
|
|
///
|
2020-06-12 11:42:28 -07:00
|
|
|
/// * `max_items` gives the maximum number of items per batch.
|
|
|
|
/// * `max_latency` gives the maximum latency for a batch item.
|
2020-06-12 10:22:08 -07:00
|
|
|
///
|
2020-06-12 11:42:28 -07:00
|
|
|
/// The default Tokio executor is used to run the given service, which means
|
|
|
|
/// that this method must be called while on the Tokio runtime.
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
pub fn new(service: T, max_items: usize, max_latency: std::time::Duration) -> Self
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Send + 'static,
|
|
|
|
T::Future: Send,
|
|
|
|
T::Error: Send + Sync,
|
2020-06-12 10:22:08 -07:00
|
|
|
Request: Send + 'static,
|
|
|
|
{
|
2020-06-12 11:42:28 -07:00
|
|
|
// XXX(hdevalence): is this bound good
|
|
|
|
let (tx, rx) = mpsc::channel(1);
|
|
|
|
let (handle, worker) = Worker::new(service, rx, max_items, max_latency);
|
|
|
|
tokio::spawn(worker.run());
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
Batch { tx, handle }
|
2020-06-12 10:22:08 -07:00
|
|
|
}
|
|
|
|
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
fn get_worker_error(&self) -> crate::BoxError {
|
2020-06-12 10:22:08 -07:00
|
|
|
self.handle.get_error_on_closed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
impl<T, Request> Service<Request> for Batch<T, Request>
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Service<BatchControl<Request>>,
|
|
|
|
T::Error: Into<crate::BoxError>,
|
2020-06-12 10:22:08 -07:00
|
|
|
{
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
type Response = T::Response;
|
|
|
|
type Error = crate::BoxError;
|
|
|
|
type Future = ResponseFuture<T::Future>;
|
2020-06-12 10:22:08 -07:00
|
|
|
|
|
|
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
// If the inner service has errored, then we error here.
|
2020-06-16 17:51:50 -07:00
|
|
|
if ready!(self.tx.poll_ready(cx)).is_err() {
|
2020-06-12 10:22:08 -07:00
|
|
|
Poll::Ready(Err(self.get_worker_error()))
|
|
|
|
} else {
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, request: Request) -> Self::Future {
|
|
|
|
// TODO:
|
|
|
|
// ideally we'd poll_ready again here so we don't allocate the oneshot
|
|
|
|
// if the try_send is about to fail, but sadly we can't call poll_ready
|
|
|
|
// outside of task context.
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
|
|
|
// get the current Span so that we can explicitly propagate it to the worker
|
|
|
|
// if we didn't do this, events on the worker related to this span wouldn't be counted
|
|
|
|
// towards that span since the worker would have no way of entering it.
|
|
|
|
let span = tracing::Span::current();
|
2020-06-12 11:42:28 -07:00
|
|
|
tracing::trace!(parent: &span, "sending request to batch worker");
|
2020-06-12 10:22:08 -07:00
|
|
|
match self.tx.try_send(Message { request, span, tx }) {
|
|
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
|
|
ResponseFuture::failed(self.get_worker_error())
|
|
|
|
}
|
|
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
|
|
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
|
|
|
|
// in the channel is reserved for the handle. Other `Sender`
|
|
|
|
// handles may not send a message using that slot. This
|
|
|
|
// guarantees capacity for `request`.
|
|
|
|
//
|
|
|
|
// Given this, the only way to hit this code path is if
|
|
|
|
// `poll_ready` has not been called & `Ready` returned.
|
|
|
|
panic!("buffer full; poll_ready must be called first");
|
|
|
|
}
|
|
|
|
Ok(_) => ResponseFuture::new(rx),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
impl<T, Request> Clone for Batch<T, Request>
|
2020-06-12 10:22:08 -07:00
|
|
|
where
|
Revert #500 (generic errors in tower-batch).
Unfortunately, since the Batch wrapper was changed to have a generic error
type, when wrapping it in another Service, nothing constrains the error type,
so we have to specify it explicitly to avoid an inference hole. This is pretty
unergonomic -- from the compiler error message it's very unintuitive that the
right fix is to change `Batch::new` to `Batch::<_, _, SomeError>::new`.
The options are:
1. roll back the changes that make the error type generic, so that the error
type is a concrete type;
2. keep the error type generic but hardcode the error in the default
constructor and add an additional code path that allows overriding the
error.
However, there's a further issue with generic errors: the error type must be
Clone. This problem comes from the fact that there can be multiple Batch
handles that have to share access to errors generated by the inner Batch
worker, so there's not a way to work around this. However, almost all error
types aren't Clone, so there are fairly few error types that we would be
swapping in.
This suggests that in case (2) we would be maintaining extra code to allow
generic errors, but with restrictive enough generic bounds to make it
impractical to use generic error types. For this reason I think that (1) is a
better option.
2020-07-15 21:42:57 -07:00
|
|
|
T: Service<BatchControl<Request>>,
|
2020-06-12 10:22:08 -07:00
|
|
|
{
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
Self {
|
|
|
|
tx: self.tx.clone(),
|
|
|
|
handle: self.handle.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|