Change Discover to be a sealed trait (#443)

* Change Discover to be a sealed trait

`Discover` was _really_ just a `TryStream<Item = Change>`, so this
change makes that much clearer. Specifically, users are intended to use
`Discover` only in bounds, whereas implementors should implement
`Stream` with the appropriate `Item` type. `Discover` then comes with a
blanket implementation for anything that implements `TryStream`
appropriately. This obviates the need for the `discover::stream` module.
This commit is contained in:
Jon Gjengset 2020-04-17 16:27:44 -04:00 committed by GitHub
parent 5947e2e145
commit c87fdd9c1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 128 additions and 180 deletions

View File

@ -4,6 +4,7 @@ use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
future::Future,
@ -49,6 +50,7 @@ impl<S, Target, Req> Service<Target> for BalanceMake<S, Req>
where
S: Service<Target>,
S::Response: Discover,
<S::Response as Discover>::Key: Hash,
<S::Response as Discover>::Service: Service<Req>,
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
{
@ -73,6 +75,7 @@ impl<F, T, E, Req> Future for MakeFuture<F, Req>
where
F: Future<Output = Result<T, E>>,
T: Discover,
<T as Discover>::Key: Hash,
<T as Discover>::Service: Service<Req>,
<<T as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
{

View File

@ -6,6 +6,7 @@ use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
@ -37,7 +38,11 @@ use tracing::{debug, trace};
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin
/// [#319]: https://github.com/tower-rs/tower/issues/319
pub struct Balance<D: Discover, Req> {
pub struct Balance<D, Req>
where
D: Discover,
D::Key: Hash,
{
discover: D,
services: ReadyCache<D::Key, D::Service, Req>,
@ -51,7 +56,7 @@ pub struct Balance<D: Discover, Req> {
impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
where
D: fmt::Debug,
D::Key: fmt::Debug,
D::Key: Hash + fmt::Debug,
D::Service: fmt::Debug,
Req: fmt::Debug,
{
@ -85,6 +90,7 @@ enum Error<E> {
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Key: Hash,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<error::Error>,
{
@ -114,7 +120,7 @@ where
impl<D, Req> Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Clone,
D::Key: Hash + Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
@ -126,17 +132,19 @@ where
fn update_pending_from_discover(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), error::Discover>> {
) -> Poll<Option<Result<(), error::Discover>>> {
debug!("updating from discover");
loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.transpose()
.map_err(|e| error::Discover(e.into()))?
{
Change::Remove(key) => {
None => return Poll::Ready(None),
Some(Change::Remove(key)) => {
trace!("remove");
self.services.evict(&key);
}
Change::Insert(key, svc) => {
Some(Change::Insert(key, svc)) => {
trace!("insert");
// If this service already existed in the set, it will be
// replaced as the new one becomes ready.
@ -218,7 +226,7 @@ where
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Clone,
D::Key: Hash + Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,

View File

@ -16,10 +16,10 @@
use super::error;
use super::p2c::Balance;
use crate::discover::{Change, Discover};
use crate::discover::Change;
use crate::load::Load;
use crate::make::MakeService;
use futures_core::ready;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use slab::Slab;
use std::{
@ -79,21 +79,16 @@ where
}
}
impl<MS, Target, Request> Discover for PoolDiscoverer<MS, Target, Request>
impl<MS, Target, Request> Stream for PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
Target: Clone,
{
type Key = usize;
type Service = DropNotifyService<MS::Service>;
type Error = MS::MakeError;
type Item = Result<Change<usize, DropNotifyService<MS::Service>>, MS::MakeError>;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) {
@ -148,7 +143,7 @@ where
message = "finished creating new service"
);
*this.load = Level::Normal;
return Poll::Ready(Ok(Change::Insert(id, svc)));
return Poll::Ready(Some(Ok(Change::Insert(id, svc))));
}
match this.load {
@ -167,7 +162,7 @@ where
pool.services = this.services.len(),
message = "removing service for over-provisioned pool"
);
Poll::Ready(Ok(Change::Remove(rm)))
Poll::Ready(Some(Ok(Change::Remove(rm))))
}
}
}

View File

@ -1,4 +1,5 @@
use super::{error::Never, Change, Discover};
use super::{error::Never, Change};
use futures_core::Stream;
use pin_project::pin_project;
use std::iter::{Enumerate, IntoIterator};
use std::{
@ -35,21 +36,16 @@ where
}
}
impl<T, U> Discover for ServiceList<T>
impl<T, U> Stream for ServiceList<T>
where
T: IntoIterator<Item = U>,
{
type Key = usize;
type Service = U;
type Error = Never;
type Item = Result<Change<usize, U>, Never>;
fn poll_discover(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().inner.next() {
Some((i, service)) => Poll::Ready(Ok(Change::Insert(i, service))),
None => Poll::Pending,
Some((i, service)) => Poll::Ready(Some(Ok(Change::Insert(i, service)))),
None => Poll::Ready(None),
}
}
}

View File

@ -1,32 +1,60 @@
//! # Tower service discovery
//! Service discovery
//!
//! Service discovery is the automatic detection of services available to the
//! consumer. These services typically live on other servers and are accessible
//! via the network; however, it is possible to discover services available in
//! other processes or even in process.
//! This module provides the [`Change`] enum, which indicates the arrival or departure of a service
//! from a collection of similar services. Most implementations should use the [`Discover`] trait
//! in their bounds to indicate that they can handle services coming and going. `Discover` itself
//! is primarily a convenience wrapper around `TryStream<Ok = Change>`.
//!
//! Every discovered service is assigned an identifier that is distinct among the currently active
//! services. If that service later goes away, a `Change::Remove` is yielded with that service's
//! identifier. From that point forward, the identifier may be re-used.
//!
//! # Examples
//!
//! ```rust
//! use futures_util::{future::poll_fn, pin_mut};
//! use tower::discover::{Change, Discover};
//! async fn services_monitor<D: Discover>(services: D) {
//! pin_mut!(services);
//! while let Some(Ok(change)) = poll_fn(|cx| services.as_mut().poll_discover(cx)).await {
//! match change {
//! Change::Insert(key, svc) => {
//! // a new service with identifier `key` was discovered
//! # let _ = (key, svc);
//! }
//! Change::Remove(key) => {
//! // the service with identifier `key` has gone away
//! # let _ = (key);
//! }
//! }
//! }
//! }
//! ```
mod error;
mod list;
mod stream;
pub use self::{list::ServiceList, stream::ServiceStream};
pub use self::list::ServiceList;
use std::hash::Hash;
use std::ops;
use crate::sealed::Sealed;
use futures_core::TryStream;
use std::{
pin::Pin,
task::{Context, Poll},
};
/// Provide a uniform set of services able to satisfy a request.
/// A dynamically changing set of related services.
///
/// This set of services may be updated over time. On each change to the set, a
/// new `NewServiceSet` is yielded by `Discover`.
/// As new services arrive and old services are retired,
/// [`Change`]s are returned which provide unique identifiers
/// for the services.
///
/// See crate documentation for more details.
pub trait Discover {
/// NewService key
type Key: Hash + Eq;
pub trait Discover: Sealed<Change<(), ()>> {
/// A unique identifier for each active service.
///
/// An identifier can be re-used once a `Change::Remove` has been yielded for its service.
type Key: Eq;
/// The type of `Service` yielded by this `Discover`.
type Service;
@ -38,53 +66,34 @@ pub trait Discover {
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>>;
) -> Poll<Option<Result<Change<Self::Key, Self::Service>, Self::Error>>>;
}
// delegate through Pin
impl<P> Discover for Pin<P>
impl<K, S, E, D: ?Sized> Sealed<Change<(), ()>> for D
where
P: Unpin + ops::DerefMut,
P::Target: Discover,
D: TryStream<Ok = Change<K, S>, Error = E>,
K: Eq,
{
type Key = <<P as ops::Deref>::Target as Discover>::Key;
type Service = <<P as ops::Deref>::Target as Discover>::Service;
type Error = <<P as ops::Deref>::Target as Discover>::Error;
}
impl<K, S, E, D: ?Sized> Discover for D
where
D: TryStream<Ok = Change<K, S>, Error = E>,
K: Eq,
{
type Key = K;
type Service = S;
type Error = E;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Pin::get_mut(self).as_mut().poll_discover(cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for &mut D {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Discover::poll_discover(Pin::new(&mut **self), cx)
) -> Poll<Option<Result<D::Ok, D::Error>>> {
TryStream::try_poll_next(self, cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for Box<D> {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
D::poll_discover(Pin::new(&mut *self), cx)
}
}
/// A change in the service set
/// A change in the service set.
#[derive(Debug)]
pub enum Change<K, V> {
/// A new service identified by key `K` was identified.

View File

@ -1,52 +0,0 @@
use super::{Change, Discover};
use futures_core::{ready, TryStream};
use pin_project::pin_project;
use std::hash::Hash;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// Dynamic service discovery based on a stream of service changes.
#[pin_project]
#[derive(Debug)]
pub struct ServiceStream<S> {
#[pin]
inner: S,
}
impl<S> ServiceStream<S> {
#[allow(missing_docs)]
pub fn new<K, Svc, Request>(services: S) -> Self
where
S: TryStream<Ok = Change<K, Svc>>,
K: Hash + Eq,
Svc: Service<Request>,
{
ServiceStream { inner: services }
}
}
impl<S, K, Svc> Discover for ServiceStream<S>
where
K: Hash + Eq,
S: TryStream<Ok = Change<K, Svc>>,
{
type Key = K;
type Service = Svc;
type Error = S::Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
match ready!(self.project().inner.try_poll_next(cx)).transpose()? {
Some(c) => Poll::Ready(Ok(c)),
None => {
// there are no more service changes coming
Poll::Pending
}
}
}
}

View File

@ -76,3 +76,8 @@ pub use crate::builder::ServiceBuilder;
pub use tower_layer::Layer;
#[doc(inline)]
pub use tower_service::Service;
#[allow(unreachable_pub)]
mod sealed {
pub trait Sealed<T> {}
}

View File

@ -1,7 +1,7 @@
//! A constant `Load` implementation. Primarily useful for testing.
use crate::discover::{Change, Discover};
use futures_core::ready;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use std::{
pin::Pin,
@ -55,24 +55,20 @@ where
}
/// Proxies `Discover` such that all changes are wrapped with a constant load.
impl<D: Discover + Unpin, M: Copy> Discover for Constant<D, M> {
type Key = D::Key;
type Service = Constant<D::Service, M>;
type Error = D::Error;
impl<D: Discover + Unpin, M: Copy> Stream for Constant<D, M> {
type Item = Result<Change<D::Key, Constant<D::Service, M>>, D::Error>;
/// Yields the next discovery change set.
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<D::Key, Self::Service>, D::Error>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;
let this = self.project();
let change = match ready!(Pin::new(this.inner).poll_discover(cx))? {
Insert(k, svc) => Insert(k, Constant::new(svc, *this.load)),
Remove(k) => Remove(k),
let change = match ready!(Pin::new(this.inner).poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => Insert(k, Constant::new(svc, *this.load)),
Some(Remove(k)) => Remove(k),
};
Poll::Ready(Ok(change))
Poll::Ready(Some(Ok(change)))
}
}

View File

@ -3,7 +3,7 @@
use super::Load;
use super::{Instrument, InstrumentFuture, NoInstrument};
use crate::discover::{Change, Discover};
use futures_core::ready;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use std::{
pin::Pin,
@ -110,23 +110,19 @@ impl<D, I> PeakEwmaDiscover<D, I> {
}
}
impl<D, I> Discover for PeakEwmaDiscover<D, I>
impl<D, I> Stream for PeakEwmaDiscover<D, I>
where
D: Discover,
I: Clone,
{
type Key = D::Key;
type Service = PeakEwma<D::Service, I>;
type Error = D::Error;
type Item = Result<Change<D::Key, PeakEwma<D::Service, I>>, D::Error>;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<D::Key, Self::Service>, D::Error>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx))? {
Change::Remove(k) => Change::Remove(k),
Change::Insert(k, svc) => {
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Change::Remove(k)) => Change::Remove(k),
Some(Change::Insert(k, svc)) => {
let peak_ewma = PeakEwma::new(
svc,
*this.default_rtt,
@ -137,7 +133,7 @@ where
}
};
Poll::Ready(Ok(change))
Poll::Ready(Some(Ok(change)))
}
}

View File

@ -3,7 +3,7 @@
use super::Load;
use super::{Instrument, InstrumentFuture, NoInstrument};
use crate::discover::{Change, Discover};
use futures_core::ready;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use std::sync::Arc;
use std::{
@ -106,29 +106,25 @@ impl<D, I> PendingRequestsDiscover<D, I> {
}
}
impl<D, I> Discover for PendingRequestsDiscover<D, I>
impl<D, I> Stream for PendingRequestsDiscover<D, I>
where
D: Discover,
I: Clone,
{
type Key = D::Key;
type Service = PendingRequests<D::Service, I>;
type Error = D::Error;
type Item = Result<Change<D::Key, PendingRequests<D::Service, I>>, D::Error>;
/// Yields the next discovery change set.
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<D::Key, Self::Service>, D::Error>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx))? {
Insert(k, svc) => Insert(k, PendingRequests::new(svc, this.instrument.clone())),
Remove(k) => Remove(k),
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.instrument.clone())),
Some(Remove(k)) => Remove(k),
};
Poll::Ready(Ok(change))
Poll::Ready(Some(Ok(change)))
}
}

View File

@ -1,4 +1,4 @@
use super::sealed::Sealed;
use crate::sealed::Sealed;
use std::future::Future;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};

View File

@ -1,4 +1,4 @@
use super::sealed::Sealed;
use crate::sealed::Sealed;
use std::future::Future;
use std::task::{Context, Poll};
use tower_service::Service;

View File

@ -5,7 +5,3 @@ mod make_service;
pub use self::make_connection::MakeConnection;
pub use self::make_service::MakeService;
mod sealed {
pub trait Sealed<T> {}
}

View File

@ -4,7 +4,7 @@ use std::future::Future;
use std::task::{Context, Poll};
use tokio_test::{assert_pending, assert_ready, task};
use tower::balance::p2c::Balance;
use tower::discover::{Change, ServiceStream};
use tower::discover::Change;
use tower_service::Service;
use tower_test::mock;
@ -34,7 +34,7 @@ impl tower::load::Load for Mock {
fn stress() {
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<_, &'static str>>();
let mut cache = Balance::<_, Req>::from_entropy(ServiceStream::new(rx));
let mut cache = Balance::<_, Req>::from_entropy(rx);
let mut nready = 0;
let mut services = slab::Slab::<(mock::Handle<Req, Req>, bool)>::new();