Compare commits

...

36 Commits

Author SHA1 Message Date
Taiki Endo b12a3e3ae9
Remove uses of pin_project::project attribute (#458)
pin-project will deprecate the project attribute due to some unfixable
limitations.

Refs: https://github.com/taiki-e/pin-project/issues/225
2020-06-15 12:38:34 -04:00
Lucio Franco 007b648ea9
Clean up readme and update status (#453) 2020-05-08 13:54:45 -04:00
Bruce Guenter 98e0e41db1
Rework ConcurrencyLimit to use upstream tokio Semaphore (#451) 2020-05-06 11:06:40 -04:00
Lucio Franco a0a66b10a2
Upgrade cargo deny action (#452) 2020-05-06 09:45:57 -04:00
Jon Gjengset 1c2d50680a
Spring cleaning for tower::balance (#449)
Noteworthy changes:

 - All constructors now follow the same pattern: `new` uses OS entropy,
   `from_rng` takes a `R: Rng` and seeds the randomness from there.
   `from_rng` is fallible, since randomness generators can be fallible.
 - `BalanceLayer` was renamed to `MakeBalanceLayer`, since it is not
   _really_ a `BalanceLayer`. The name of `BalanceMake` was also
   "normalized" to `MakeBalance`.

Another observation: the `Debug` bound on `Load::Metric` in
`p2c::Balance`, while not particularly onerous, generates really
confusing errors if you forget it include it. And crucially, the error
never points at `Debug` (should we file a compiler issue?), so I pretty
much had to guess my way to that being wrong in the doc example.

It would probably be useful to add a documentation example to
`MakeBalanceLayer` or `MakeBalance` (I suspect just one of them is fine,
since they're basically the same). Since I've never used it, and find it
hard to think of uses for it, it might be good if someone with more
experience with it wrote one.
2020-04-24 13:21:11 -04:00
Jon Gjengset 6a25d322b5 Use only one alias for Box<dyn Error>
This was a mostly mechanical change. I think in at least one place it
results in a `'static` bound being added, but the next tower release
will be breaking anyway, so that's okay.

I think it helps to also document the alias at the top to (eventually)
explain how people can interact with the error they get back to discover
the "deeper cause".
2020-04-24 10:30:20 -04:00
Eliza Weisman 8752a38117
util: fix oneshot dropping pending services immediately (#447)
## Motivation

Commit #330 introduced a regression when porting `tower-util::Oneshot`
from `futures` 0.1 to `std::future`. The *intended* behavior is that a
oneshot future should repeatedly call `poll_ready` on the oneshotted
service until it is ready, and then call the service and drive the
returned future. However, #330 inadvertently changed the oneshot future
to poll the service _once_, call it if it is ready, and then drop it,
regardless of its readiness.

In the #330 version of oneshot, an `Option` is used to store the
request while waiting for the service to become ready, so that it can be
`take`n and moved into the service's `call`. However, the `Option`
contains both the request _and_ the service itself, and is taken the
first time the service is polled. `futures::ready!` is then used when
polling the service, so the method returns immediate if it is not ready.
This means that the service itself (and the request), which were taken
out of the `Option`, will be dropped, and if the oneshot future is
polled again, it will panic.

## Solution

This commit changes the `Oneshot` future so that only the request lives
in the `Option`, and it is only taken when the service is called, rather
than every time it is polled. This fixes the bug.

I've also added a test for this which fails against master, but passes
after this change.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
2020-04-23 16:07:48 -07:00
Steven Fackler 82e578b5b0
Impl Layer for &Layer (#446) 2020-04-21 17:11:27 -04:00
Jon Gjengset 39112cb0ba
Tidy up tower::load (#445)
This also renames the `Instrument` trait, and related types, to better
reflect what they do. Specifically, the trait is now called
`TrackCompletion`, and `NoInstrument` is called `CompleteOnResponse`.

Also brings back balance example and makes it compile.
2020-04-20 14:55:40 -04:00
Jon Gjengset 05b165056b
Tidy up tower::buffer (#444) 2020-04-17 17:41:51 -04:00
Jon Gjengset c87fdd9c1e
Change Discover to be a sealed trait (#443)
* Change Discover to be a sealed trait

`Discover` was _really_ just a `TryStream<Item = Change>`, so this
change makes that much clearer. Specifically, users are intended to use
`Discover` only in bounds, whereas implementors should implement
`Stream` with the appropriate `Item` type. `Discover` then comes with a
blanket implementation for anything that implements `TryStream`
appropriately. This obviates the need for the `discover::stream` module.
2020-04-17 16:27:44 -04:00
Jon Gjengset 5947e2e145
Some more spring clean fixes. (#442)
* Add doc feature annotations

* Modules should be published or removed
2020-04-17 16:03:15 -04:00
Lucio Franco 85b657bf93
Remove path deps for `tower-service` (#441) 2020-04-17 14:00:38 -04:00
Lucio Franco 5e1788f494
rate: Fix rate limit not resetting (#439) 2020-04-16 11:31:58 -04:00
Lucio Franco cd7dd12315
Refactor github actions (#436)
Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
2020-04-14 19:20:20 -04:00
Lucio Franco 8a73440c1a
reconnect: Rework to allow real reconnecting (#437)
Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
2020-04-14 16:42:37 -04:00
Lucio Franco d34019045f
Add `Map` service combinator (#435)
Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
Co-authored-by: David Barsky <dbarsky@amazon.com>
2020-04-14 15:16:16 -04:00
Akshay Narayan 0520a6a467
New sub-crate: tower-steer (#426) 2020-03-31 21:26:13 -04:00
Jon Gjengset 81cfbab19e
Merge pull request #432 from tower-rs/2020-spring-clean
2020: merge all the middleware
2020-03-31 16:55:48 -04:00
Jon Gjengset 9dd2314048
step 4: make features do the right thing 2020-03-31 16:26:53 -04:00
Jon Gjengset 2e06782241
step 3: make ci work again 2020-03-31 16:26:52 -04:00
Jon Gjengset c4d70b535b
step 2: make all the tests work again 2020-03-31 16:12:32 -04:00
Jon Gjengset 8df2a3e410
step 1: move all things to where they're going
Note that this also moves all crates from `log` to `tracing`.
It also does not set any dependencies as optional.
2020-03-31 13:31:21 -04:00
Jon Gjengset 0f9eb648a5
limit: prepare 0.3.1 release (#430) 2020-03-25 19:51:59 -04:00
Jon Gjengset 378433fc75
limit: Forward tower_load::Load (#429) 2020-03-25 19:46:05 -04:00
Jon Gjengset b575175210
util: prepare 0.3.1 release (#428) 2020-03-23 13:02:43 -04:00
Jon Gjengset 52fde9767c
util: Add ReadyAnd to do what Ready should do (#427)
* util: Add ReadyAnd to do what Ready should do

`ServiceExt::ready` says that it produces "A future yielding the service
when it is ready to accept a request." This is not true; it does _not_
yield the service when it is ready, it yields unit. This makes it
impossible to chain service ready with service call, which is sad.

This PR adds `ready_and`, which does what `ready` promised. It also
deprecates `ready` with the intention that we remove `ready` in a future
version, and make the strictly more general `ready_and` take its place.
We can't do it now since it's not a backwards-compatible change even
though it _probably_ wouldn't break any code.

The PR also updates the docs so that they reflect the observed behavior.
2020-03-23 12:49:44 -04:00
Jon Gjengset b6f5f586c5
Add Buffer::new note on how to set bound (#425) 2020-03-04 15:48:33 -05:00
Jake Ham 52d9e95a38
Fix documentation links in README (#422)
Updated the README, fixing the links to documentation. This now links
to each packages documentation on docs.rs. Not all packages have been
released to crates.io, so their documentation pages are empty.
2020-02-27 11:42:08 -05:00
Jon Gjengset ba1fdd755b ready-cache: Prepare for 0.3.1 release
This also fixes up the various documentation URLs, which were still
pointing to 0.1.x.
2020-02-24 13:14:23 -05:00
Jon Gjengset 414e3b0809
ready-cache: Avoid panic on strange race (#420)
It's been observed that occasionally tower-ready-cache would panic
trying to find an already canceled service in `cancel_pending_txs`
(#415). The source of the race is not entirely clear, but extensive
debugging demonstrated that occasionally a call to `evict` would send on
the `CancelTx` for a service, yet that service would be yielded back
from `pending` in `poll_pending` in a non-`Canceled` state. This
is equivalent to saying that this code may panic:

```rust
async {
  let (tx, rx) = oneshot::channel();
  tx.send(42).unwrap();
  yield_once().await;
  rx.try_recv().unwrap(); // <- may occasionally panic
}
```

I have not been able to demonstrate a self-contained example failing in
this way, but it's the only explanation I have found for the observed
bug. Pinning the entire runtime to one core still produced the bug,
which indicates that it is not a memory ordering issue. Replacing
oneshot with `mpsc::channel(1)` still produced the bug, which indicates
that the bug is not with the implementation of `oneshot`. Logs also
indicate that the `ChannelTx` we send on in `evict()` truly is the same
one associated with the `ChannelRx` polled in `Pending::poll`, so we're
not getting our wires crossed somewhere. It truly is bizarre.

This patch resolves the issue by considering a failure to find a
ready/errored service's `CancelTx` as another signal that a service has
been removed. Specifically, if `poll_pending` finds a service that
returns `Ok` or `Err`, but does _not_ find its `CancelTx`, then it
assumes that it must be because the service _was_ canceled, but did not
observe that cancellation signal.

As an explanation, this isn't entirely satisfactory, since we do not
fully understand the underlying problem. It _may_ be that a canceled
service could remain in the pending state for a very long time if it
does not become ready _and_ does not see the cancellation signal (so it
returns `Poll::Pending` and is not removed). That, in turn, might cause
an issue if the driver of the `ReadyCache` then chooses to re-use a key
they believe they have evicted. However, any such case _must_ first hit
the panic that exists in the code today, so this is still an improvement
over the status quo.

Fixes #415.
2020-02-24 13:03:43 -05:00
Jon Gjengset be156e733d ready-cache: restore assert for dropped cancel tx
When ready-cache was upgraded from futures 0.1 to `std::future` in
e2f1a49cf3, this `expect` was removed, and
the code instead silently ignores the error. That's probably not what we
want, so this patch restores that assertion.
2020-02-20 17:08:07 -05:00
Jon Gjengset 1a67100aab Restore commented-out p2c assertion 2020-02-20 16:33:54 -05:00
Jon Gjengset ae34c9b4a1 Add more tower-ready-cache tests 2020-02-20 16:33:54 -05:00
Jon Gjengset 96529148d8 Remove irrelevant comment
The assertion there isn't even true anyway, since the p2c may not yet
have "seen" the removal of a service, because it stopped when it found a
ready service.
2020-02-20 16:01:19 -05:00
Jon Gjengset 650e5be58e balance: Add a stress test for p2c
The hope for this was to reproduce #415 (which it does not sadly), but
at least it adds a test for p2c!
2020-02-20 16:01:19 -05:00
202 changed files with 2747 additions and 4434 deletions

View File

@ -7,133 +7,125 @@ on:
pull_request: {}
jobs:
test-workspace:
runs-on: ${{ matrix.os }}
check:
# Run `cargo check` first to ensure that the pushed code at least compiles.
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu-18.04, windows-2019, macOS-10.14]
rust: [stable, nightly]
rust: [stable, 1.40.0]
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Install rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true
- name: Build
uses: actions-rs/cargo@v1
with:
command: check
args: --verbose
- name: Run tests
uses: actions-rs/cargo@v1
env:
CI: 'True'
RUSTFLAGS: '-D warnings'
with:
command: test
args: --verbose
test:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
- name: Check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --all-targets --all-features
cache-cargo-hack:
runs-on: ubuntu-latest
steps:
- name: Fetch latest release version of cargo-hack
run: |
mkdir -p .github/caching
curl -sL https://api.github.com/repos/taiki-e/cargo-hack/releases/latest | jq -r '.name' > .github/caching/cargo-hack.lock
- name: Cache cargo-hack/bin
id: cache-cargo-hack
uses: actions/cache@v1
with:
path: ${{ runner.tool_cache }}/cargo-hack/bin
key: cargo-hack-bin-${{ hashFiles('.github/caching/cargo-hack.lock') }}
- name: Install cargo-hack
if: "steps.cache-cargo-hack.outputs.cache-hit != 'true'"
uses: actions-rs/cargo@v1
with:
command: install
args: --root ${{ runner.tool_cache }}/cargo-hack --force cargo-hack
cargo-hack:
needs: cache-cargo-hack
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
- name: Fetch latest release version of cargo-hack
run: |
mkdir -p .github/caching
curl -sL https://api.github.com/repos/taiki-e/cargo-hack/releases/latest | jq -r '.name' > .github/caching/cargo-hack.lock
- name: Restore cargo-hack/bin
uses: actions/cache@v1
with:
path: ${{ runner.tool_cache }}/cargo-hack/bin
key: cargo-hack-bin-${{ hashFiles('.github/caching/cargo-hack.lock') }}
- run: echo "::add-path::${{ runner.tool_cache }}/cargo-hack/bin"
# if `cargo-hack` somehow doesn't exist after loading it from the cache,
# make *sure* it's there.
- run: cargo hack --help || { cargo install --force cargo-hack; }
- name: cargo hack check
working-directory: ${{ matrix.subcrate }}
run: cargo hack check --each-feature --no-dev-deps --all
test-versions:
# Test against the stable, beta, and nightly Rust toolchains on ubuntu-latest.
needs: check
runs-on: ubuntu-latest
strategy:
matrix:
crate:
- tower
- tower-balance
- tower-buffer
- tower-discover
- tower-filter
- tower-hedge
- tower-layer
- tower-limit
- tower-load
- tower-load-shed
- tower-make
- tower-ready-cache
- tower-reconnect
- tower-retry
- tower-service
- tower-spawn-ready
- tower-test
- tower-timeout
- tower-util
rust: [stable, beta, nightly, 1.40.0]
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Install rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Patch
run: |
set -e
# Remove any existing patch statements
mv Cargo.toml Cargo.toml.bck
sed -n '/\[patch.crates-io\]/q;p' Cargo.toml.bck > Cargo.toml
# Patch all crates
cat .github/workflows/patch.toml >> Cargo.toml
# Print `Cargo.toml` for debugging
echo "~~~~ Cargo.toml ~~~~"
cat Cargo.toml
echo "~~~~~~~~~~~~~~~~~~~~"
- name: Build
uses: actions-rs/cargo@v1
with:
command: check
args: -p ${{ matrix.crate }} --verbose
- name: Run tests
uses: actions-rs/cargo@v1
env:
CI: 'True'
RUSTFLAGS: '-D warnings'
with:
command: test
args: -p ${{ matrix.crate }} --verbose
rustfmt:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features
style:
# Check style.
needs: check
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Ensure that rustfmt is installed
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt
- name: Run rustfmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt
profile: minimal
- name: rustfmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
# warnings:
# # Check for any warnings. This is informational and thus is allowed to fail.
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@master
# - uses: actions-rs/toolchain@v1
# with:
# toolchain: stable
# components: clippy
# profile: minimal
# - name: Clippy
# uses: actions-rs/clippy-check@v1
# with:
# token: ${{ secrets.GITHUB_TOKEN }}
# args: --all --all-targets --all-features -- -D warnings
# This is failing, because it finds errors...
# clippy_check:
#
# runs-on: ubuntu-latest
#
# steps:
# - name: Checkout code
# uses: actions/checkout@v1
# - name: Ensure that clippy is installed
# uses: actions-rs/toolchain@v1
# with:
# toolchain: nightly
# override: true
# components: clippy
# - name: Run clippy
# uses: actions-rs/clippy-check@v1
# with:
# token: ${{ secrets.GITHUB_TOKEN }}
# args: --all-targets --all-features
deny-check:
name: cargo-deny check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: EmbarkStudios/cargo-deny-action@v1

View File

@ -2,17 +2,6 @@
# repository.
[patch.crates-io]
tower = { path = "tower" }
tower-balance = { path = "tower-balance" }
tower-buffer = { path = "tower-buffer" }
tower-discover = { path = "tower-discover" }
tower-filter = { path = "tower-filter" }
tower-layer = { path = "tower-layer" }
tower-limit = { path = "tower-limit" }
tower-load-shed = { path = "tower-load-shed" }
tower-reconnect = { path = "tower-reconnect" }
tower-retry = { path = "tower-retry" }
tower-service = { path = "tower-service" }
tower-spawn-ready = { path = "tower-spawn-ready" }
tower-test = { path = "tower-test" }
tower-timeout = { path = "tower-timeout" }
tower-util = { path = "tower-util" }

View File

@ -2,22 +2,7 @@
members = [
"tower",
"tower-balance",
"tower-buffer",
"tower-discover",
"tower-filter",
"tower-hedge",
"tower-layer",
"tower-limit",
"tower-load",
"tower-load-shed",
"tower-ready-cache",
"tower-reconnect",
"tower-retry",
"tower-service",
"tower-spawn-ready",
"tower-test",
"tower-timeout",
"tower-make",
"tower-util",
]

View File

@ -17,52 +17,11 @@ Tower aims to make it as easy as possible to build robust networking clients and
servers. It is protocol agnostic, but is designed around a request / response
pattern. If your protocol is entirely stream based, Tower may not be a good fit.
## Project Layout
Tower consists of a number of components, each of which live in their own sub
crates.
* [`tower`]: The main user facing crate that provides batteries included tower services ([docs][t-docs]).
* [`tower-service`]: The foundational traits upon which Tower is built
([docs][ts-docs]).
* [`tower-layer`]: The foundational trait to compose services together
([docs][tl-docs]).
* [`tower-balance`]: A load balancer. Load is balanced across a number of
services ([docs][tb-docs]).
* [`tower-buffer`]: A buffering middleware. If the inner service is not ready to
handle the next request, `tower-buffer` stores the request in an internal
queue ([docs][tbuf-docs]).
* [`tower-discover`]: Service discovery abstraction ([docs][td-docs]).
* [`tower-filter`]: Middleware that conditionally dispatch requests to the inner
service based on a predicate ([docs][tf-docs]).
* [`tower-limit`]: Middleware limiting the number of requests that are
processed ([docs][tlim-docs]).
* [`tower-reconnect`]: Middleware that automatically reconnects the inner
service when it becomes degraded ([docs][tre-docs]).
* [`tower-retry`]: Middleware that retries requests based on a given `Policy`
([docs][tretry-docs]).
* [`tower-test`]: Testing utilies ([docs][ttst-docs]).
* [`tower-timeout`]: Middleware that applies a timeout to requests
([docs][tt-docs]).
* [`tower-util`]: Miscellaneous additional utilities for Tower
([docs][tu-docs]).
## Status
Currently, only [`tower-service`], the foundational trait, has been released to
crates.io. The rest of the library will be following shortly.
Currently, `tower 0.3` is released on crates. We are currently working on cleaning
up the codebase and adding more documentation. You can follow our progress in
this [issue](https://github.com/tower-rs/tower/issues/431).
## License
@ -73,31 +32,3 @@ This project is licensed under the [MIT license](LICENSE).
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.
[`tower`]: tower
[t-docs]: https://tower-rs.github.io/tower/doc/tower/index.html
[`tower-service`]: tower-service
[ts-docs]: https://docs.rs/tower-service/
[`tower-layer`]: tower-layer
[tl-docs]: https://docs.rs/tower-layer/
[`tower-balance`]: tower-balance
[tb-docs]: https://tower-rs.github.io/tower/doc/tower_balance/index.html
[`tower-buffer`]: tower-buffer
[tbuf-docs]: https://tower-rs.github.io/tower/doc/tower_buffer/index.html
[`tower-discover`]: tower-discover
[td-docs]: https://tower-rs.github.io/tower/doc/tower_discover/index.html
[`tower-filter`]: tower-filter
[tf-docs]: https://tower-rs.github.io/tower/doc/tower_filter/index.html
[`tower-limit`]: tower-limit
[tlim-docs]: https://tower-rs.github.io/tower/doc/tower_limit/index.html
[`tower-reconnect`]: tower-reconnect
[tre-docs]: https://tower-rs.github.io/tower/doc/tower_reconnect/index.html
[`tower-retry`]: tower-retry
[tretry-docs]: https://tower-rs.github.io/tower/doc/tower_retry/index.html
[`tower-timeout`]: tower-timeout
[`tower-test`]: tower-test
[ttst-docs]: https://tower-rs.github.io/tower/doc/tower_test/index.html
[`tower-rate-limit`]: tower-rate-limit
[tt-docs]: https://tower-rs.github.io/tower/doc/tower_timeout/index.html
[`tower-util`]: tower-util
[tu-docs]: https://tower-rs.github.io/tower/doc/tower_util/index.html

25
deny.toml Normal file
View File

@ -0,0 +1,25 @@
[advisories]
vulnerability = "deny"
unmaintained = "warn"
notice = "warn"
ignore = []
[licenses]
unlicensed = "deny"
allow = []
deny = []
copyleft = "warn"
allow-osi-fsf-free = "either"
confidence-threshold = 0.8
[bans]
multiple-versions = "deny"
highlight = "all"
skip-tree = [
{ name = "tower", version = "=0.3"}
]
[sources]
unknown-registry = "warn"
unknown-git = "warn"
allow-git = []

View File

@ -1,18 +0,0 @@
# 0.3.0 (December 4, 2019)
- Update to `tower-service 0.3`
- Update to `tower-ready-cache 0.3`
- Update to `futures 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,53 +0,0 @@
[package]
name = "tower-balance"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-balance/0.3.0"
description = """
Balance load across a set of uniform services.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
log = ["tracing/log"]
default = ["log"]
[dependencies]
futures-util = { version = "0.3", default-features = false }
futures-core = { version = "0.3", default-features = false }
pin-project = "0.4"
indexmap = "1.0.2"
tracing = "0.1"
rand = { version = "0.7", features = ["small_rng"] }
tokio = { version = "0.2", features = ["sync", "time"] }
tower-discover = { version = "0.3", path = "../tower-discover" }
tower-layer = "0.3"
tower-load = { version = "0.3", path = "../tower-load" }
tower-service = "0.3"
tower-ready-cache = { version = "0.3", path = "../tower-ready-cache" }
tower-make = "0.3"
slab = "0.4"
[dev-dependencies]
tracing-subscriber = "0.1.1"
hdrhistogram = "6.0"
quickcheck = { version = "0.6", default-features = false }
tokio = { version = "0.2", features = ["macros"] }
tokio-test = "0.2"
tower-buffer = { version = "0.3", path = "../tower-buffer" }
tower-limit = { version = "0.3", path = "../tower-limit" }
tower-test = { version = "0.3", path = "../tower-test" }
tower = { version = "0.3", path = "../tower" }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Balance
Balance load across a set of uniform services.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,14 +0,0 @@
//! Load balancing middlewares.
#![doc(html_root_url = "https://docs.rs/tower-balance/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
pub mod error;
pub mod p2c;
pub mod pool;

View File

@ -1,48 +0,0 @@
use super::BalanceMake;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Efficiently distributes requests across an arbitrary number of services
#[derive(Clone)]
pub struct BalanceLayer<D, Req> {
rng: SmallRng,
_marker: PhantomData<fn(D, Req)>,
}
impl<D, Req> BalanceLayer<D, Req> {
/// Builds a balancer using the system entropy.
pub fn new() -> Self {
Self {
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}
/// Builds a balancer from the provided RNG.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(rng: &mut R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
_marker: PhantomData,
})
}
}
impl<S, Req> Layer<S> for BalanceLayer<S, Req> {
type Service = BalanceMake<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service {
BalanceMake::new(make_discover, self.rng.clone())
}
}
impl<D, Req> fmt::Debug for BalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BalanceLayer")
.field("rng", &self.rng)
.finish()
}
}

View File

@ -1,12 +0,0 @@
//! A Power-of-Two-Choices Load Balancer
mod layer;
mod make;
mod service;
#[cfg(test)]
mod test;
pub use layer::BalanceLayer;
pub use make::{BalanceMake, MakeFuture};
pub use service::Balance;

View File

@ -1,31 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `tokio 0.2`
- Update to `futures-core 0.3`
- Update to `tower-service 0.3`
- Update to `tower-layer 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1b (September 13, 2019)
- Remove `Stream` unused warning.
# 0.3.0-alpha.1a (September 13, 2019)
- Fix `poll_next` not exisitng.
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.1 (July 19, 2019)
- Add `tracing` support
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,39 +0,0 @@
[package]
name = "tower-buffer"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-buffer/0.3.0"
description = """
Buffer requests before dispatching to a `Service`.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
log = ["tracing/log"]
default = ["log"]
[dependencies]
futures-core = { version = "0.3", default-features = false }
pin-project = "0.4"
tower-service = "0.3"
tower-layer = "0.3"
tokio = { version = "0.2", features = ["rt-core", "sync"] }
tracing = "0.1.2"
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = { version = "0.2" }
tokio = { version = "0.2", features = ["macros"] }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Buffer
Buffer requests before dispatching to a `Service`.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,42 +0,0 @@
use crate::{error::Error, service::Buffer};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
use tower_service::Service;
/// Buffer requests with a bounded buffer
pub struct BufferLayer<Request> {
bound: usize,
_p: PhantomData<fn(Request)>,
}
impl<Request> BufferLayer<Request> {
/// Create a new `BufferLayer` with the provided `bound`.
pub fn new(bound: usize) -> Self {
BufferLayer {
bound,
_p: PhantomData,
}
}
}
impl<S, Request> Layer<S> for BufferLayer<Request>
where
S: Service<Request> + Send + 'static,
S::Future: Send,
S::Error: Into<Error> + Send + Sync,
Request: Send + 'static,
{
type Service = Buffer<S, Request>;
fn layer(&self, service: S) -> Self::Service {
Buffer::new(service, self.bound)
}
}
impl<Request> fmt::Debug for BufferLayer<Request> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer")
.field("bound", &self.bound)
.finish()
}
}

View File

@ -1,25 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
//! Buffer requests when the inner service is out of capacity.
//!
//! Buffering works by spawning a new task that is dedicated to pulling requests
//! out of the buffer and dispatching them to the inner service. By adding a
//! buffer and a dedicated task, the `Buffer` layer in front of the service can
//! be `Clone` even if the inner service is not.
pub mod error;
pub mod future;
mod layer;
mod message;
mod service;
mod worker;
pub use crate::layer::BufferLayer;
pub use crate::service::Buffer;

View File

@ -1,17 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `tower-service 0.3`
- Update to `futures-core 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,27 +0,0 @@
[package]
name = "tower-discover"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-discover/0.3.0"
description = """
Abstracts over service discovery strategies.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core = { version = "0.3", default-features = false }
tower-service = "0.3"
pin-project = "0.4"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Discovery
Abstracts over service discovery strategies.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,103 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-discover/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
//! # Tower service discovery
//!
//! Service discovery is the automatic detection of services available to the
//! consumer. These services typically live on other servers and are accessible
//! via the network; however, it is possible to discover services available in
//! other processes or even in process.
mod error;
mod list;
mod stream;
pub use crate::{list::ServiceList, stream::ServiceStream};
use std::hash::Hash;
use std::ops;
use std::{
pin::Pin,
task::{Context, Poll},
};
/// Provide a uniform set of services able to satisfy a request.
///
/// This set of services may be updated over time. On each change to the set, a
/// new `NewServiceSet` is yielded by `Discover`.
///
/// See crate documentation for more details.
pub trait Discover {
/// NewService key
type Key: Hash + Eq;
/// The type of `Service` yielded by this `Discover`.
type Service;
/// Error produced during discovery
type Error;
/// Yields the next discovery change set.
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>>;
}
// delegate through Pin
impl<P> Discover for Pin<P>
where
P: Unpin + ops::DerefMut,
P::Target: Discover,
{
type Key = <<P as ops::Deref>::Target as Discover>::Key;
type Service = <<P as ops::Deref>::Target as Discover>::Service;
type Error = <<P as ops::Deref>::Target as Discover>::Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Pin::get_mut(self).as_mut().poll_discover(cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for &mut D {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Discover::poll_discover(Pin::new(&mut **self), cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for Box<D> {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
D::poll_discover(Pin::new(&mut *self), cx)
}
}
/// A change in the service set
#[derive(Debug)]
pub enum Change<K, V> {
/// A new service identified by key `K` was identified.
Insert(K, V),
/// The service identified by key `K` disappeared.
Remove(K),
}

View File

@ -1,52 +0,0 @@
use crate::{Change, Discover};
use futures_core::{ready, TryStream};
use pin_project::pin_project;
use std::hash::Hash;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// Dynamic service discovery based on a stream of service changes.
#[pin_project]
#[derive(Debug)]
pub struct ServiceStream<S> {
#[pin]
inner: S,
}
impl<S> ServiceStream<S> {
#[allow(missing_docs)]
pub fn new<K, Svc, Request>(services: S) -> Self
where
S: TryStream<Ok = Change<K, Svc>>,
K: Hash + Eq,
Svc: Service<Request>,
{
ServiceStream { inner: services }
}
}
impl<S, K, Svc> Discover for ServiceStream<S>
where
K: Hash + Eq,
S: TryStream<Ok = Change<K, Svc>>,
{
type Key = K;
type Service = Svc;
type Error = S::Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
match ready!(self.project().inner.try_poll_next(cx)).transpose()? {
Some(c) => Poll::Ready(Ok(c)),
None => {
// there are no more service changes coming
Poll::Pending
}
}
}
}

View File

@ -1,12 +0,0 @@
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,35 +0,0 @@
[package]
name = "tower-filter"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-filter/0.3.0-alpha.2"
description = """
Conditionally allow requests to be dispatched to a service based on the result
of a predicate.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
publish = false
[dependencies]
tower-service = "0.3"
tower-layer = "0.3"
pin-project = "0.4"
futures-core = { version = "0.3", default-features = false }
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
tokio = { version = "0.2", features = ["macros", "sync"] }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,14 +0,0 @@
# Tower Filter
Conditionally allow requests to be dispatched to a service based on the result
of a predicate.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,3 +0,0 @@
# 0.3.0 (December 4, 2019)
- Initial release

View File

@ -1,20 +0,0 @@
[package]
name = "tower-hedge"
version = "0.3.0"
authors = ["Alex Leong <adlleong@gmail.com>"]
edition = "2018"
publish = false
[dependencies]
hdrhistogram = "6.0"
log = "0.4.1"
tower-service = "0.3"
tower-filter = { version = "0.3", path = "../tower-filter" }
tokio = { version = "0.2", features = ["time"] }
futures-util = { version = "0.3", default-features = false }
pin-project = "0.4"
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = "0.2"
tokio = { version = "0.2", features = ["macros", "test-util"] }

View File

View File

@ -24,4 +24,4 @@ edition = "2018"
[dependencies]
[dev-dependencies]
tower-service = { version = "0.3.0", path = "../tower-service" }
tower-service = { version = "0.3.0" }

View File

@ -88,3 +88,14 @@ pub trait Layer<S> {
/// that has been decorated with the middleware.
fn layer(&self, inner: S) -> Self::Service;
}
impl<'a, T, S> Layer<S> for &'a T
where
T: ?Sized + Layer<S>,
{
type Service = T::Service;
fn layer(&self, inner: S) -> Self::Service {
(**self).layer(inner)
}
}

View File

@ -1,17 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `futures 0.3`
- Update to `tokio 0.2`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,34 +0,0 @@
[package]
name = "tower-limit"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-limit/0.3.0"
description = """
Limit maximum request rate to a `Service`.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core = { version = "0.3", default-features = false }
tower-service = "0.3"
tower-layer = "0.3"
tokio = { version = "0.2", features = ["time"] }
pin-project = "0.4"
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = "0.2"
tokio = { version = "0.2", features = ["macros", "test-util"] }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Rate Limit
Limit maximum request rate to a `Service`.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,108 +0,0 @@
use super::future::ResponseFuture;
use tower_service::Service;
use super::sync::semaphore::{self, Semaphore};
use futures_core::ready;
use std::sync::Arc;
use std::task::{Context, Poll};
/// Enforces a limit on the concurrent number of requests the underlying
/// service can handle.
#[derive(Debug)]
pub struct ConcurrencyLimit<T> {
inner: T,
limit: Limit,
}
#[derive(Debug)]
struct Limit {
semaphore: Arc<Semaphore>,
permit: semaphore::Permit,
}
impl<T> ConcurrencyLimit<T> {
/// Create a new concurrency limiter.
pub fn new(inner: T, max: usize) -> Self {
ConcurrencyLimit {
inner,
limit: Limit {
semaphore: Arc::new(Semaphore::new(max)),
permit: semaphore::Permit::new(),
},
}
}
/// Get a reference to the inner service
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Get a mutable reference to the inner service
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consume `self`, returning the inner service
pub fn into_inner(self) -> T {
self.inner
}
}
impl<S, Request> Service<Request> for ConcurrencyLimit<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.limit.permit.poll_acquire(cx, &self.limit.semaphore))
.expect("poll_acquire after semaphore closed ");
Poll::Ready(ready!(self.inner.poll_ready(cx)))
}
fn call(&mut self, request: Request) -> Self::Future {
// Make sure a permit has been acquired
if self
.limit
.permit
.try_acquire(&self.limit.semaphore)
.is_err()
{
panic!("max requests in-flight; poll_ready must be called first");
}
// Call the inner service
let future = self.inner.call(request);
// Forget the permit, the permit will be returned when
// `future::ResponseFuture` is dropped.
self.limit.permit.forget();
ResponseFuture::new(future, self.limit.semaphore.clone())
}
}
impl<S> Clone for ConcurrencyLimit<S>
where
S: Clone,
{
fn clone(&self) -> ConcurrencyLimit<S> {
ConcurrencyLimit {
inner: self.inner.clone(),
limit: Limit {
semaphore: self.limit.semaphore.clone(),
permit: semaphore::Permit::new(),
},
}
}
}
impl Drop for Limit {
fn drop(&mut self) {
self.permit.release(&self.semaphore);
}
}

View File

@ -1,51 +0,0 @@
#![allow(dead_code)]
use std::cell::UnsafeCell;
#[derive(Debug)]
pub(crate) struct CausalCell<T>(UnsafeCell<T>);
#[derive(Default)]
pub(crate) struct CausalCheck(());
impl<T> CausalCell<T> {
pub(crate) fn new(data: T) -> CausalCell<T> {
CausalCell(UnsafeCell::new(data))
}
pub(crate) fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}
pub(crate) fn with_unchecked<F, R>(&self, f: F) -> R
where
F: FnOnce(*const T) -> R,
{
f(self.0.get())
}
pub(crate) fn check(&self) {}
pub(crate) fn with_deferred<F, R>(&self, f: F) -> (R, CausalCheck)
where
F: FnOnce(*const T) -> R,
{
(f(self.0.get()), CausalCheck::default())
}
pub(crate) fn with_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(*mut T) -> R,
{
f(self.0.get())
}
}
impl CausalCheck {
pub(crate) fn check(self) {}
pub(crate) fn join(&mut self, _other: CausalCheck) {}
}

View File

@ -1,7 +0,0 @@
// Vendored `tokio/src/sync/semaphore.rs` and `tokio/src/sync/task/atomic_waker.rs`
// Commit sha: 24cd6d67f76f122f67cbbb101d555018fc27820b
mod cell;
mod waker;
pub(super) mod semaphore;

File diff suppressed because it is too large Load Diff

View File

@ -1,316 +0,0 @@
use super::cell::CausalCell;
use std::sync::atomic::{self, AtomicUsize};
use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::Waker;
/// A synchronization primitive for task waking.
///
/// `AtomicWaker` will coordinate concurrent wakes with the consumer
/// potentially "waking" the underlying task. This is useful in scenarios
/// where a computation completes in another thread and wants to wake the
/// consumer, but the consumer is in the process of being migrated to a new
/// logical task.
///
/// Consumers should call `register` before checking the result of a computation
/// and producers should call `wake` after producing the computation (this
/// differs from the usual `thread::park` pattern). It is also permitted for
/// `wake` to be called **before** `register`. This results in a no-op.
///
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
/// `wake`.
pub(crate) struct AtomicWaker {
state: AtomicUsize,
waker: CausalCell<Option<Waker>>,
}
// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
// stores a `Waker` value produced by calls to `register` and many threads can
// race to take the waker by calling `wake.
//
// If a new `Waker` instance is produced by calling `register` before an existing
// one is consumed, then the existing one is overwritten.
//
// While `AtomicWaker` is single-producer, the implementation ensures memory
// safety. In the event of concurrent calls to `register`, there will be a
// single winner whose waker will get stored in the cell. The losers will not
// have their tasks woken. As such, callers should ensure to add synchronization
// to calls to `register`.
//
// The implementation uses a single `AtomicUsize` value to coordinate access to
// the `Waker` cell. There are two bits that are operated on independently. These
// are represented by `REGISTERING` and `WAKING`.
//
// The `REGISTERING` bit is set when a producer enters the critical section. The
// `WAKING` bit is set when a consumer enters the critical section. Neither
// bit being set is represented by `WAITING`.
//
// A thread obtains an exclusive lock on the waker cell by transitioning the
// state from `WAITING` to `REGISTERING` or `WAKING`, depending on the
// operation the thread wishes to perform. When this transition is made, it is
// guaranteed that no other thread will access the waker cell.
//
// # Registering
//
// On a call to `register`, an attempt to transition the state from WAITING to
// REGISTERING is made. On success, the caller obtains a lock on the waker cell.
//
// If the lock is obtained, then the thread sets the waker cell to the waker
// provided as an argument. Then it attempts to transition the state back from
// `REGISTERING` -> `WAITING`.
//
// If this transition is successful, then the registering process is complete
// and the next call to `wake` will observe the waker.
//
// If the transition fails, then there was a concurrent call to `wake` that
// was unable to access the waker cell (due to the registering thread holding the
// lock). To handle this, the registering thread removes the waker it just set
// from the cell and calls `wake` on it. This call to wake represents the
// attempt to wake by the other thread (that set the `WAKING` bit). The
// state is then transitioned from `REGISTERING | WAKING` back to `WAITING`.
// This transition must succeed because, at this point, the state cannot be
// transitioned by another thread.
//
// # Waking
//
// On a call to `wake`, an attempt to transition the state from `WAITING` to
// `WAKING` is made. On success, the caller obtains a lock on the waker cell.
//
// If the lock is obtained, then the thread takes ownership of the current value
// in the waker cell, and calls `wake` on it. The state is then transitioned
// back to `WAITING`. This transition must succeed as, at this point, the state
// cannot be transitioned by another thread.
//
// If the thread is unable to obtain the lock, the `WAKING` bit is still.
// This is because it has either been set by the current thread but the previous
// value included the `REGISTERING` bit **or** a concurrent thread is in the
// `WAKING` critical section. Either way, no action must be taken.
//
// If the current thread is the only concurrent call to `wake` and another
// thread is in the `register` critical section, when the other thread **exits**
// the `register` critical section, it will observe the `WAKING` bit and
// handle the waker itself.
//
// If another thread is in the `waker` critical section, then it will handle
// waking the caller task.
//
// # A potential race (is safely handled).
//
// Imagine the following situation:
//
// * Thread A obtains the `wake` lock and wakes a task.
//
// * Before thread A releases the `wake` lock, the woken task is scheduled.
//
// * Thread B attempts to wake the task. In theory this should result in the
// task being woken, but it cannot because thread A still holds the wake
// lock.
//
// This case is handled by requiring users of `AtomicWaker` to call `register`
// **before** attempting to observe the application state change that resulted
// in the task being woken. The wakers also change the application state
// before calling wake.
//
// Because of this, the task will do one of two things.
//
// 1) Observe the application state change that Thread B is waking on. In
// this case, it is OK for Thread B's wake to be lost.
//
// 2) Call register before attempting to observe the application state. Since
// Thread A still holds the `wake` lock, the call to `register` will result
// in the task waking itself and get scheduled again.
/// Idle state
const WAITING: usize = 0;
/// A new waker value is being registered with the `AtomicWaker` cell.
const REGISTERING: usize = 0b01;
/// The task currently registered with the `AtomicWaker` cell is being woken.
const WAKING: usize = 0b10;
impl AtomicWaker {
/// Create an `AtomicWaker`
pub(crate) fn new() -> AtomicWaker {
AtomicWaker {
state: AtomicUsize::new(WAITING),
waker: CausalCell::new(None),
}
}
/// Registers the current waker to be notified on calls to `wake`.
///
/// This is the same as calling `register_task` with `task::current()`.
#[cfg(feature = "io-driver")]
pub(crate) fn register(&self, waker: Waker) {
self.do_register(waker);
}
/// Registers the provided waker to be notified on calls to `wake`.
///
/// The new waker will take place of any previous wakers that were registered
/// by previous calls to `register`. Any calls to `wake` that happen after
/// a call to `register` (as defined by the memory ordering rules), will
/// wake the `register` caller's task.
///
/// It is safe to call `register` with multiple other threads concurrently
/// calling `wake`. This will result in the `register` caller's current
/// task being woken once.
///
/// This function is safe to call concurrently, but this is generally a bad
/// idea. Concurrent calls to `register` will attempt to register different
/// tasks to be woken. One of the callers will win and have its task set,
/// but there is no guarantee as to which caller will succeed.
pub(crate) fn register_by_ref(&self, waker: &Waker) {
self.do_register(waker);
}
fn do_register<W>(&self, waker: W)
where
W: WakerRef,
{
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
WAITING => {
unsafe {
// Locked acquired, update the waker cell
self.waker.with_mut(|t| *t = Some(waker.into_waker()));
// Release the lock. If the state transitioned to include
// the `WAKING` bit, this means that a wake has been
// called concurrently, so we have to remove the waker and
// wake it.`
//
// Start by assuming that the state is `REGISTERING` as this
// is what we jut set it to.
let res = self
.state
.compare_exchange(REGISTERING, WAITING, AcqRel, Acquire);
match res {
Ok(_) => {}
Err(actual) => {
// This branch can only be reached if a
// concurrent thread called `wake`. In this
// case, `actual` **must** be `REGISTERING |
// `WAKING`.
debug_assert_eq!(actual, REGISTERING | WAKING);
// Take the waker to wake once the atomic operation has
// completed.
let waker = self.waker.with_mut(|t| (*t).take()).unwrap();
// Just swap, because no one could change state
// while state == `Registering | `Waking`
self.state.swap(WAITING, AcqRel);
// The atomic swap was complete, now
// wake the waker and return.
waker.wake();
}
}
}
}
WAKING => {
// Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old waker.
// So, we call wake on the new waker.
waker.wake();
// This is equivalent to a spin lock, so use a spin hint.
atomic::spin_loop_hint();
}
state => {
// In this case, a concurrent thread is holding the
// "registering" lock. This probably indicates a bug in the
// caller's code as racing to call `register` doesn't make much
// sense.
//
// We just want to maintain memory safety. It is ok to drop the
// call to `register`.
debug_assert!(state == REGISTERING || state == REGISTERING | WAKING);
}
}
}
/// Wakes the task that last called `register`.
///
/// If `register` has not been called yet, then this does nothing.
pub(crate) fn wake(&self) {
if let Some(waker) = self.take_waker() {
waker.wake();
}
}
/// Attempts to take the `Waker` value out of the `AtomicWaker` with the
/// intention that the caller will wake the task later.
pub(crate) fn take_waker(&self) -> Option<Waker> {
// AcqRel ordering is used in order to acquire the value of the `waker`
// cell as well as to establish a `release` ordering with whatever
// memory the `AtomicWaker` is associated with.
match self.state.fetch_or(WAKING, AcqRel) {
WAITING => {
// The waking lock has been acquired.
let waker = unsafe { self.waker.with_mut(|t| (*t).take()) };
// Release the lock
self.state.fetch_and(!WAKING, Release);
waker
}
state => {
// There is a concurrent thread currently updating the
// associated waker.
//
// Nothing more to do as the `WAKING` bit has been set. It
// doesn't matter if there are concurrent registering threads or
// not.
//
debug_assert!(
state == REGISTERING || state == REGISTERING | WAKING || state == WAKING
);
None
}
}
}
}
impl Default for AtomicWaker {
fn default() -> Self {
AtomicWaker::new()
}
}
impl fmt::Debug for AtomicWaker {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "AtomicWaker")
}
}
unsafe impl Send for AtomicWaker {}
unsafe impl Sync for AtomicWaker {}
trait WakerRef {
fn wake(self);
fn into_waker(self) -> Waker;
}
impl WakerRef for Waker {
fn wake(self) {
self.wake()
}
fn into_waker(self) -> Waker {
self
}
}
impl WakerRef for &Waker {
fn wake(self) {
self.wake_by_ref()
}
fn into_waker(self) -> Waker {
self.clone()
}
}

View File

@ -1,18 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
//! Tower middleware for limiting requests.
pub mod concurrency;
pub mod rate;
pub use crate::{
concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer},
rate::{RateLimit, RateLimitLayer},
};

View File

@ -1,35 +0,0 @@
use std::time::Duration;
use tokio::time;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
use tower_limit::rate::RateLimitLayer;
use tower_test::{assert_request_eq, mock};
#[tokio::test]
async fn reaching_capacity() {
time::pause();
let rate_limit = RateLimitLayer::new(1, Duration::from_millis(100));
let (mut service, mut handle) = mock::spawn_layer(rate_limit);
assert_ready_ok!(service.poll_ready());
let response = service.call("hello");
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(response.await.unwrap(), "world");
assert_pending!(service.poll_ready());
assert_pending!(handle.poll_request());
time::advance(Duration::from_millis(101)).await;
assert_ready_ok!(service.poll_ready());
let response = service.call("two");
assert_request_eq!(handle, "two").send_response("done");
assert_eq!(response.await.unwrap(), "done");
}

View File

@ -1,17 +0,0 @@
# 0.3.0 (December 4, 2019)
- Update to `tower-service 0.3`
- Update to `futures 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,34 +0,0 @@
[package]
name = "tower-load-shed"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-load-shed/0.3.0"
description = """
Immediately reject requests if the inner service is not ready. This is also
known as load-shedding.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
tower-service = "0.3"
tower-layer = "0.3"
pin-project = "0.4"
futures-core = { version = "0.3", default-features = false }
[dev-dependencies]
tokio-test = "0.2"
tower-test = { version = "0.3", path = "../tower-test" }
tokio = { version = "0.2", features = ["macros"] }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,14 +0,0 @@
# Tower Load Shed
Immediately reject requests if the inner service is not ready. This is also
known as load-shedding.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,17 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `tower-service 0.3`
- Update to `tower-discover 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,35 +0,0 @@
[package]
name = "tower-load"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-load/0.3.0-alpha.2"
description = """
Strategies for measuring the load of a service
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core = { version = "0.3", default-features = false }
log = "0.4.1"
tokio = { version = "0.2", features = ["time"] }
tower-service = "0.3"
tower-discover = { version = "0.3", path = "../tower-discover" }
pin-project = "0.4"
[dev-dependencies]
tokio-test = "0.2"
tokio = { version = "0.2", features = ["macros", "test-util" ] }
futures-util = { version = "0.3", default-features = false }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Load
Provides strategies for measuring a service's load.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,86 +0,0 @@
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Attaches `I`-typed instruments to `V` typed values.
///
/// This utility allows load metrics to have a protocol-agnostic means to track streams
/// past their initial response future. For example, if `V` represents an HTTP response
/// type, an implementation could add `H`-typed handles to each response's extensions to
/// detect when the response is dropped.
///
/// Handles are intended to be RAII guards that primarily implement `Drop` and update load
/// metric state as they are dropped.
///
/// A base `impl<H, V> Instrument<H, V> for NoInstrument` is provided to drop the handle
/// immediately. This is appropriate when a response is discrete and cannot comprise
/// multiple messages.
///
/// In many cases, the `Output` type is simply `V`. However, `Instrument` may alter the
/// type in order to instrument it appropriately. For example, an HTTP Instrument may
/// modify the body type: so an `Instrument` that takes values of type `http::Response<A>`
/// may output values of type `http::Response<B>`.
pub trait Instrument<H, V>: Clone {
/// The instrumented value type.
type Output;
/// Attaches an `H`-typed handle to a `V`-typed value.
fn instrument(&self, handle: H, value: V) -> Self::Output;
}
/// A `Instrument` implementation that drops each instrument immediately.
#[derive(Clone, Copy, Debug)]
pub struct NoInstrument;
/// Attaches a `I`-typed instruments to the result of an `F`-typed `Future`.
#[pin_project]
#[derive(Debug)]
pub struct InstrumentFuture<F, I, H> {
#[pin]
future: F,
handle: Option<H>,
instrument: I,
}
// ===== impl InstrumentFuture =====
impl<F, I, H> InstrumentFuture<F, I, H> {
/// Wraps a future, instrumenting its value if successful.
pub fn new(instrument: I, handle: H, future: F) -> Self {
InstrumentFuture {
future,
instrument,
handle: Some(handle),
}
}
}
impl<F, I, H, T, E> Future for InstrumentFuture<F, I, H>
where
F: Future<Output = Result<T, E>>,
I: Instrument<H, T>,
{
type Output = Result<I::Output, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let rsp = ready!(this.future.poll(cx))?;
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.instrument.instrument(h, rsp)))
}
}
// ===== NoInstrument =====
impl<H, V> Instrument<H, V> for NoInstrument {
type Output = V;
fn instrument(&self, handle: H, value: V) -> V {
drop(handle);
value
}
}

View File

@ -1,31 +0,0 @@
//! Abstractions and utilties for measuring a service's load.
#![doc(html_root_url = "https://docs.rs/tower-load/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;
pub use self::{
constant::Constant,
instrument::{Instrument, InstrumentFuture, NoInstrument},
peak_ewma::{PeakEwma, PeakEwmaDiscover},
pending_requests::{PendingRequests, PendingRequestsDiscover},
};
/// Exposes a load metric.
pub trait Load {
/// A comparable load metric. Lesser values are "preferable" to greater values.
type Metric: PartialOrd;
/// Obtains a service's load.
fn load(&self) -> Self::Metric;
}

View File

@ -1,26 +0,0 @@
# 0.3.0 (November 29, 2019)
- Update `tokio` to `0.2`.
- Rename `io` feature to `connect`.
# 0.3.0-alpha.2a (September 30, 2019)
- Update `tokio-io` to `alpha.6`
- Update `tower-service` to `alpha.2`
# 0.3.0-alpha.2 (September 20, 2019)
- Update `tokio-io` to `alpha.5`
# 0.3.0-alpha.1 (September 11, 2019)
- Bump version to match all the other crates with `std::future`
# 0.1.0-alpha.2 (August 30, 2019)
- Update `tokio-io` to `alpha.4`
# 0.1.0-alpha.1 (August 26, 2019)
- Initial release

View File

@ -1,21 +0,0 @@
[package]
name = "tower-make"
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-make/0.3.0"
description = """
Trait aliases for Services that produce specific types of Responses.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
connect = ["tokio"]
[dependencies]
tower-service = "0.3"
tokio = { version = "0.2", optional = true }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Service Makers
Trait aliases for Services that produce specific types of Responses.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,21 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-make/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
//! Trait aliases for Services that produce specific types of Responses.
#[cfg(feature = "connect")]
mod make_connection;
mod make_service;
#[cfg(feature = "connect")]
pub use crate::make_connection::MakeConnection;
pub use crate::make_service::MakeService;
mod sealed {
pub trait Sealed<T> {}
}

View File

@ -1,35 +0,0 @@
[package]
name = "tower-ready-cache"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-ready-cache/0.1.0"
description = """
Caches a set of services
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
indexmap = "1.0.2"
log = "0.4.1"
tokio = { version = "0.2", features = ["sync"] }
tower-service = "0.3"
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = "0.2"

View File

@ -1,14 +0,0 @@
# Tower Ready Cache
Provides a set of ready services.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,11 +0,0 @@
//! A cache of services
#![doc(html_root_url = "https://docs.rs/tower-ready-cache/0.1.0")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
pub mod cache;
pub mod error;
pub use self::cache::ReadyCache;

View File

@ -1,71 +0,0 @@
use tokio_test::{assert_pending, assert_ready, task};
use tower_ready_cache::ReadyCache;
use tower_test::mock;
type Req = &'static str;
type Mock = mock::Mock<Req, Req>;
#[test]
fn poll_ready_inner_failure() {
let mut task = task::spawn(());
let mut cache = ReadyCache::<usize, Mock, Req>::default();
let (service0, mut handle0) = mock::pair::<Req, Req>();
handle0.send_error("doom");
cache.push(0, service0);
let (service1, mut handle1) = mock::pair::<Req, Req>();
handle1.allow(1);
cache.push(1, service1);
let failed = assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap_err();
assert_eq!(failed.0, 0);
assert_eq!(format!("{}", failed.1), "doom");
assert_eq!(cache.len(), 1);
}
#[test]
fn poll_ready_not_ready() {
let mut task = task::spawn(());
let mut cache = ReadyCache::<usize, Mock, Req>::default();
let (service0, mut handle0) = mock::pair::<Req, Req>();
handle0.allow(0);
cache.push(0, service0);
let (service1, mut handle1) = mock::pair::<Req, Req>();
handle1.allow(0);
cache.push(1, service1);
assert_pending!(task.enter(|cx, _| cache.poll_pending(cx)));
assert_eq!(cache.ready_len(), 0);
assert_eq!(cache.pending_len(), 2);
assert_eq!(cache.len(), 2);
}
#[test]
fn poll_ready_promotes_inner() {
let mut task = task::spawn(());
let mut cache = ReadyCache::<usize, Mock, Req>::default();
let (service0, mut handle0) = mock::pair::<Req, Req>();
handle0.allow(1);
cache.push(0, service0);
let (service1, mut handle1) = mock::pair::<Req, Req>();
handle1.allow(1);
cache.push(1, service1);
assert_eq!(cache.ready_len(), 0);
assert_eq!(cache.pending_len(), 2);
assert_eq!(cache.len(), 2);
assert_ready!(task.enter(|cx, _| cache.poll_pending(cx))).unwrap();
assert_eq!(cache.ready_len(), 2);
assert_eq!(cache.pending_len(), 0);
assert_eq!(cache.len(), 2);
}

View File

@ -1,16 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `tower-service 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,28 +0,0 @@
[package]
name = "tower-reconnect"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-reconnect/0.3.0-alpha.2"
description = """
Automatically recreate a new `Service` instance when an error is encountered.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
log = "0.4.1"
tower-service = "0.3"
tower-make = "0.3"
pin-project = "0.4"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Reconnect
Automatically recreate a new `Service` instance when an error is encountered.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,33 +0,0 @@
use crate::Error;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future that resolves to the response or failure to connect.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<F> {
#[pin]
inner: F,
}
impl<F> ResponseFuture<F> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture { inner }
}
}
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
{
type Output = Result<T, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map_err(Into::into)
}
}

View File

@ -1,19 +0,0 @@
# 0.3.0 (December 4, 2019)
- Update to `tower-service 0.3`
- Update to `tower-layer 0.3`
- Update to `tokio 0.2`
- Update to `futures-core 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,35 +0,0 @@
[package]
name = "tower-retry"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-retry/0.3.0"
description = """
Retry failed requests.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
tower-service = "0.3"
tower-layer = "0.3"
tokio = { version = "0.2", features = ["time"] }
pin-project = "0.4"
futures-core = { version = "0.3", default-features = false }
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio = { version = "0.2", features = ["macros", "test-util"] }
tokio-test = "0.2"
futures-util = { version = "0.3", default-features = false }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Retry
Retry failed requests.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -24,4 +24,4 @@ edition = "2018"
[dependencies]
[dev-dependencies]
http = "0.1"
http = "0.2"

View File

@ -1,16 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `tower-service 0.3`
- Update to `tokio 0.2`
- Remove `Executor` usage
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)

View File

@ -1,35 +0,0 @@
[package]
name = "tower-spawn-ready"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-spawn-ready/0.3.0-alpha.2"
description = """
Drives service readiness via a spawned task
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
pin-project = "0.4"
tower-service = "0.3"
tower-layer = "0.3"
tokio = { version = "0.2", features = ["sync"] }
[dev-dependencies]
tower-test = { version = "0.3", path = "../tower-test" }
tokio-test = "0.2"
tokio = { version = "0.2", features = ["macros"] }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Spawn Ready
Spawn Ready ensures that its inner service is driven to readiness on an executor. Useful with pooling layers that may poll their inner service infrequently.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,23 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-spawn-ready/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![allow(elided_lifetimes_in_paths)]
//! When an underlying service is not ready, drive it to readiness on a
//! background task.
pub mod future;
mod layer;
mod make;
mod service;
pub use crate::layer::SpawnReadyLayer;
pub use crate::make::{MakeFuture, MakeSpawnReady};
pub use crate::service::SpawnReady;
/// Errors produced by `SpawnReady`.
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;

View File

@ -24,10 +24,10 @@ edition = "2018"
[dependencies]
futures-util = { version = "0.3", default-features = false }
tokio = { version = "0.2", features = ["sync"]}
tower-layer = "0.3"
tower-layer = { version = "0.3", path = "../tower-layer" }
tokio-test = "0.2"
tower-service = "0.3"
pin-project = "0.4"
tower-service = { version = "0.3" }
pin-project = "0.4.17"
[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "0.2", features = ["macros"] }

View File

@ -1,20 +0,0 @@
# 0.3.0 (December 1, 2019)
- Update to `tower-service 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.1 (July 30th, 2019)
- Add `Elapsed::new`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,28 +0,0 @@
[package]
name = "tower-timeout"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.3.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-timeout/0.3.0"
description = """
Apply a timeout to requests, ensuring completion within a fixed time duration.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
tower-service = "0.3"
tower-layer = "0.3"
tokio = { version = "0.2", features = ["time"] }
pin-project = "0.4"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Timeout
Apply a timeout to requests, ensuring completion within a fixed time duration.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,18 +0,0 @@
# 0.3.0 (December 19, 2019)
- Update to `tower-serivce 0.3`
- Update to `futures 0.3`
- Update to `tokio 0.2`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,41 +0,0 @@
[package]
name = "tower-util"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.3.x" git tag.
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-util/0.3.0-alpha.2"
description = """
Utilities for working with `Service`.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
default = ["call-all"]
call-all = ["futures-util"]
[dependencies]
tower-service = "0.3"
pin-project = "0.4"
futures-core = { version = "0.3", default-features = false }
# Optional
futures-util = { version = "0.3", optional = true, default-features = false, features = ["alloc"] }
[dev-dependencies]
tokio-test = "0.2"
tokio = { version = "0.2", features = ["stream", "sync", "macros"] }
tower-test = { version = "0.3", path = "../tower-test" }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Service Util
Utilities for working with `Service`.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,55 +0,0 @@
use std::{fmt, marker::PhantomData};
use futures_core::ready;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// Future yielding a `Service` once the service is ready to process a request
///
/// `Ready` values are produced by `ServiceExt::ready`.
pub struct Ready<'a, T, Request> {
inner: &'a mut T,
_p: PhantomData<fn() -> Request>,
}
// Safety: This is safe because `Services`'s are always `Unpin`.
impl<'a, T, Request> Unpin for Ready<'a, T, Request> {}
impl<'a, T, Request> Ready<'a, T, Request>
where
T: Service<Request>,
{
#[allow(missing_docs)]
pub fn new(service: &'a mut T) -> Self {
Ready {
inner: service,
_p: PhantomData,
}
}
}
impl<'a, T, Request> Future for Ready<'a, T, Request>
where
T: Service<Request>,
{
type Output = Result<(), T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(self.inner.poll_ready(cx))?;
Poll::Ready(Ok(()))
}
}
impl<'a, T, Request> fmt::Debug for Ready<'a, T, Request>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Ready").field("inner", &self.inner).finish()
}
}

View File

@ -1,3 +1,19 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
### Changed
- All middleware `tower-*` crates were merged into `tower` and placed
behind feature flags.
### Removed
# 0.3.1 (January 17, 2020)
- Allow opting out of tracing/log (#410).

View File

@ -7,7 +7,7 @@ name = "tower"
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
# - Create "vX.X.X" git tag.
version = "0.3.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
@ -24,26 +24,53 @@ keywords = ["io", "async", "non-blocking", "futures", "service"]
edition = "2018"
[features]
default = ["full", "log"]
full = []
log = ["tower-buffer/log"]
default = ["log"]
log = ["tracing/log"]
balance = ["discover", "load", "ready-cache", "make", "rand", "slab"]
buffer = ["tokio/sync", "tokio/rt-core"]
discover = []
filter = []
hedge = ["filter", "futures-util", "hdrhistogram", "tokio/time"]
limit = ["tokio/time"]
load = ["tokio/time"]
load-shed = []
make = ["tokio/io-std"]
ready-cache = ["futures-util", "indexmap", "tokio/sync"]
reconnect = ["make", "tokio/io-std"]
retry = ["tokio/time"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt-core"]
steer = ["futures-util"]
timeout = ["tokio/time"]
util = ["futures-util"]
[dependencies]
tower-buffer = { version = "0.3", path = "../tower-buffer", default-features = false }
tower-discover = { version = "0.3", path = "../tower-discover" }
tower-layer = "0.3"
tower-limit = { version = "0.3", path = "../tower-limit" }
tower-load-shed = { version = "0.3", path = "../tower-load-shed" }
tower-retry = { version = "0.3", path = "../tower-retry" }
tower-service = "0.3"
tower-timeout = { version = "0.3", path = "../tower-timeout" }
tower-util = { version = "0.3", path = "../tower-util", features = ["call-all"] }
futures-core = { version = "0.3", default-features = false }
futures-core = "0.3"
pin-project = "0.4.17"
tower-layer = { version = "0.3", path = "../tower-layer" }
tower-service = { version = "0.3" }
tracing = "0.1.2"
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
hdrhistogram = { version = "6.0", optional = true }
indexmap = { version = "1.0.2", optional = true }
rand = { version = "0.7", features = ["small_rng"], optional = true }
slab = { version = "0.4", optional = true }
tokio = { version = "0.2", optional = true, features = ["sync"] }
[dev-dependencies]
# env_logger = { version = "0.5.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
tokio = { version = "0.2", features = ["macros"] }
# log = "0.4.1"
# # tokio = "0.2"
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
hdrhistogram = "6.0"
quickcheck = { version = "0.9", default-features = false }
tokio = { version = "0.2", features = ["macros", "stream", "sync", "test-util" ] }
tokio-test = "0.2"
tower-test = { version = "0.3", path = "../tower-test" }
tracing-subscriber = "0.1.1"
# env_logger = { version = "0.5.3", default-features = false }
# log = "0.4.1"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["full"]

View File

@ -1,21 +1,22 @@
//! Exercises load balancers with mocked services.
use futures_core::TryStream;
use futures_core::{Stream, TryStream};
use futures_util::{stream, stream::StreamExt, stream::TryStreamExt};
use hdrhistogram::Histogram;
use pin_project::pin_project;
use rand::{self, Rng};
use std::hash::Hash;
use std::time::Duration;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::time::{self, Instant};
use tower::balance as lb;
use tower::discover::{Change, Discover};
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load;
use tower::util::ServiceExt;
use tower_balance as lb;
use tower_discover::{Change, Discover};
use tower_limit::concurrency::ConcurrencyLimit;
use tower_load as load;
use tower_service::Service;
const REQUESTS: usize = 100_000;
@ -57,17 +58,19 @@ async fn main() {
let decay = Duration::from_secs(10);
let d = gen_disco();
let pe = lb::p2c::Balance::from_entropy(load::PeakEwmaDiscover::new(
let pe = lb::p2c::Balance::new(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
load::NoInstrument,
load::CompleteOnResponse::default(),
));
run("P2C+PeakEWMA...", pe).await;
let d = gen_disco();
let ll =
lb::p2c::Balance::from_entropy(load::PendingRequestsDiscover::new(d, load::NoInstrument));
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));
run("P2C+LeastLoaded...", ll).await;
}
@ -78,20 +81,19 @@ type Key = usize;
#[pin_project]
struct Disco<S>(Vec<(Key, S)>);
impl<S> Discover for Disco<S>
impl<S> Stream for Disco<S>
where
S: Service<Req, Response = Rsp, Error = Error>,
{
type Key = Key;
type Service = S;
type Error = Error;
fn poll_discover(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
type Item = Result<Change<Key, S>, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().0.pop() {
Some((k, service)) => Poll::Ready(Ok(Change::Insert(k, service))),
None => Poll::Pending,
Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))),
None => {
// there may be more later
Poll::Pending
}
}
}
}
@ -132,7 +134,7 @@ async fn run<D>(name: &'static str, lb: lb::p2c::Balance<D, Req>)
where
D: Discover + Unpin + Send + 'static,
D::Error: Into<Error>,
D::Key: Clone + Send,
D::Key: Clone + Send + Hash,
D::Service: Service<Req, Response = Rsp> + load::Load + Send,
<D::Service as Service<Req>>::Error: Into<Error>,
<D::Service as Service<Req>>::Future: Send,

View File

@ -1,12 +1,10 @@
//! Error types
//! Error types for the `tower::balance` middleware.
use std::fmt;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
/// An error returned when the balancer's endpoint discovery stream fails.
/// The balancer's endpoint discovery stream failed.
#[derive(Debug)]
pub struct Discover(pub(crate) Error);
pub struct Discover(pub(crate) crate::BoxError);
impl fmt::Display for Discover {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {

58
tower/src/balance/mod.rs Normal file
View File

@ -0,0 +1,58 @@
//! Middleware that allows balancing load among multiple services.
//!
//! In larger systems, multiple endpoints are often available for a given service. As load
//! increases, you want to ensure that that load is spread evenly across the available services.
//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded
//! service, even when spare capacity is available to handle that request elsewhere.
//!
//! This module provides two pieces of middleware that helps with this type of load balancing:
//!
//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust
//! technique for spreading load across services with only inexact load measurements. Use this if
//! the set of available services is not within your control, and you simply want to spread load
//! among that set of services.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//!
//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall
//! current load by tracking successful and unsuccessful calls to `poll_ready`, and uses an
//! exponentially weighted moving average to add (using [`tower::make_service::MakeService`]) or
//! remove (by dropping) services in response to increases or decreases in load. Use this if you
//! are able to dynamically add more service endpoints to the system to handle added load.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! # #[cfg(feature = "load")]
//! # fn warnings_are_errors() {
//! use tower::balance::p2c::Balance;
//! use tower::load::Load;
//! use tower::{Service, ServiceExt};
//! use futures_util::pin_mut;
//! # use futures_core::Stream;
//! # use futures_util::StreamExt;
//!
//! async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
//! where
//! S::Error: Into<tower::BoxError>,
//! # // this bound is pretty unfortunate, and the compiler does _not_ help
//! S::Metric: std::fmt::Debug,
//! {
//! // Spread load evenly across the two services
//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));
//!
//! // Issue all the requests that come in.
//! // Some will go to svc1, some will go to svc2.
//! pin_mut!(reqs);
//! let mut responses = p2c.call_all(reqs);
//! while let Some(rsp) = responses.next().await {
//! // ...
//! }
//! }
//! # }
//! ```
pub mod error;
pub mod p2c;
pub mod pool;

View File

@ -0,0 +1,58 @@
use super::MakeBalance;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the
/// "inner" service in response to requests coming from the "outer" service.
///
/// This construction may seem a little odd at first glance. This is not a layer that takes
/// requests and produces responses in the traditional sense. Instead, it is more like
/// [`MakeService`](tower::make_service::MakeService) in that it takes service _descriptors_ (see
/// `Target` on `MakeService`) and produces _services_. Since [`Balance`] spreads requests across a
/// _set_ of services, the inner service should produce a [`Discover`], not just a single
/// [`Service`], given a service descriptor.
///
/// See the [module-level documentation](..) for details on load balancing.
#[derive(Clone)]
pub struct MakeBalanceLayer<D, Req> {
rng: SmallRng,
_marker: PhantomData<fn(D, Req)>,
}
impl<D, Req> MakeBalanceLayer<D, Req> {
/// Build balancers using operating system entropy.
pub fn new() -> Self {
Self {
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}
/// Build balancers using a seed from the provided random number generator.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(rng: &mut R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
_marker: PhantomData,
})
}
}
impl<S, Req> Layer<S> for MakeBalanceLayer<S, Req> {
type Service = MakeBalance<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service {
MakeBalance::from_rng(make_discover, self.rng.clone()).expect("SmallRng is infallible")
}
}
impl<D, Req> fmt::Debug for MakeBalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MakeBalanceLayer")
.field("rng", &self.rng)
.finish()
}
}

View File

@ -1,26 +1,33 @@
use super::Balance;
use crate::error;
use crate::discover::Discover;
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_discover::Discover;
use tower_service::Service;
/// Makes `Balancer`s given an inner service that makes `Discover`s.
/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service.
///
/// This is effectively an implementation of [`MakeService`](tower::make_service::MakeService),
/// except that it forwards the service descriptors (`Target`) to an inner service (`S`), and
/// expects that service to produce a service set in the form of a [`Discover`]. It then wraps the
/// service set in a [`Balance`] before returning it as the "made" service.
///
/// See the [module-level documentation](..) for details on load balancing.
#[derive(Clone, Debug)]
pub struct BalanceMake<S, Req> {
pub struct MakeBalance<S, Req> {
inner: S,
rng: SmallRng,
_marker: PhantomData<fn(Req)>,
}
/// Makes a balancer instance.
/// A [`Balance`] in the making.
#[pin_project]
#[derive(Debug)]
pub struct MakeFuture<F, Req> {
@ -30,27 +37,36 @@ pub struct MakeFuture<F, Req> {
_marker: PhantomData<fn(Req)>,
}
impl<S, Req> BalanceMake<S, Req> {
pub(crate) fn new(inner: S, rng: SmallRng) -> Self {
impl<S, Req> MakeBalance<S, Req> {
/// Build balancers using operating system entropy.
pub fn new(make_discover: S) -> Self {
Self {
inner,
rng,
inner: make_discover,
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(make_discover: S) -> Self {
Self::new(make_discover, SmallRng::from_entropy())
/// Build balancers using a seed from the provided random number generator.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(inner: S, rng: R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
inner,
rng,
_marker: PhantomData,
})
}
}
impl<S, Target, Req> Service<Target> for BalanceMake<S, Req>
impl<S, Target, Req> Service<Target> for MakeBalance<S, Req>
where
S: Service<Target>,
S::Response: Discover,
<S::Response as Discover>::Key: Hash,
<S::Response as Discover>::Service: Service<Req>,
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = Balance<S::Response, Req>;
type Error = S::Error;
@ -73,15 +89,16 @@ impl<F, T, E, Req> Future for MakeFuture<F, Req>
where
F: Future<Output = Result<T, E>>,
T: Discover,
<T as Discover>::Key: Hash,
<T as Discover>::Service: Service<Req>,
<<T as Discover>::Service as Service<Req>>::Error: Into<error::Error>,
<<T as Discover>::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Output = Result<Balance<T, Req>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner, this.rng.clone());
let svc = Balance::from_rng(inner, this.rng.clone()).expect("SmallRng is infallible");
Poll::Ready(Ok(svc))
}
}

View File

@ -0,0 +1,40 @@
//! This module implements the "[Power of Two Random Choices]" load balancing algorithm.
//!
//! It is a simple but robust technique for spreading load across services with only inexact load
//! measurements. As its name implies, whenever a request comes in, it samples two ready services
//! at random, and issues the request to whichever service is less loaded. How loaded a service is
//! is determined by the return value of [`Load`](tower::load::Load).
//!
//! As described in the [Finagle Guide][finagle]:
//!
//! > The algorithm randomly picks two services from the set of ready endpoints and
//! > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > expect a manageable upper bound on the maximum load of any server.
//! >
//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > `n` is the number of servers in the cluster.
//!
//! The balance service and layer implementations rely on _service discovery_ to provide the
//! underlying set of services to balance requests across. This happens through the
//! [`Discover`](tower::discover::Discover) trait, which is essentially a `Stream` that indicates
//! when services become available or go away. If you have a fixed set of services, consider using
//! [`ServiceList`](tower::discover::ServiceList).
//!
//! Since the load balancer needs to perform _random_ choices, the constructors in this module
//! usually come in two forms: one that uses randomness provided by the operating system, and one
//! that lets you specify the random seed to use. Usually the former is what you'll want, though
//! the latter may come in handy for reproducability or to reduce reliance on the operating system.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
mod layer;
mod make;
mod service;
#[cfg(test)]
mod test;
pub use layer::MakeBalanceLayer;
pub use make::{MakeBalance, MakeFuture};
pub use service::Balance;

View File

@ -1,8 +1,12 @@
use crate::error;
use super::super::error;
use crate::discover::{Change, Discover};
use crate::load::Load;
use crate::ready_cache::{error::Failed, ReadyCache};
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project::pin_project;
use rand::{rngs::SmallRng, SeedableRng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
@ -11,33 +15,25 @@ use std::{
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_ready_cache::{error::Failed, ReadyCache};
use tower_service::Service;
use tracing::{debug, trace};
/// Distributes requests across inner services using the [Power of Two Choices][p2c].
/// Efficiently distributes requests across an arbitrary number of services.
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
/// See the [module-level documentation](..) for details.
///
/// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement
/// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes
/// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you
/// construct the `Balance` instance. For more details, see [#319].
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin
/// [#319]: https://github.com/tower-rs/tower/issues/319
pub struct Balance<D: Discover, Req> {
pub struct Balance<D, Req>
where
D: Discover,
D::Key: Hash,
{
discover: D,
services: ReadyCache<D::Key, D::Service, Req>,
@ -51,7 +47,7 @@ pub struct Balance<D: Discover, Req> {
impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
where
D: fmt::Debug,
D::Key: fmt::Debug,
D::Key: Hash + fmt::Debug,
D::Service: fmt::Debug,
Req: fmt::Debug,
{
@ -63,10 +59,10 @@ where
}
}
#[pin_project]
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is removed from discovery.
/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
#[pin_project]
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
@ -85,13 +81,14 @@ enum Error<E> {
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Key: Hash,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<error::Error>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
/// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self {
/// Constructs a load balancer that uses operating system entropy.
pub fn new(discover: D) -> Self {
Self {
rng,
rng: SmallRng::from_entropy(),
discover,
services: ReadyCache::default(),
ready_index: None,
@ -100,9 +97,17 @@ where
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(discover: D) -> Self {
Self::new(discover, SmallRng::from_entropy())
/// Constructs a load balancer seeded with the provided random number generator.
pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
discover,
services: ReadyCache::default(),
ready_index: None,
_req: PhantomData,
})
}
/// Returns the number of endpoints currently tracked by the balancer.
@ -114,11 +119,11 @@ where
impl<D, Req> Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Key: Hash + Clone,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<error::Error>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
/// Polls `discover` for updates, adding new items to `not_ready`.
///
@ -126,17 +131,19 @@ where
fn update_pending_from_discover(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), error::Discover>> {
) -> Poll<Option<Result<(), error::Discover>>> {
debug!("updating from discover");
loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.transpose()
.map_err(|e| error::Discover(e.into()))?
{
Change::Remove(key) => {
None => return Poll::Ready(None),
Some(Change::Remove(key)) => {
trace!("remove");
self.services.evict(&key);
}
Change::Insert(key, svc) => {
Some(Change::Insert(key, svc)) => {
trace!("insert");
// If this service already existed in the set, it will be
// replaced as the new one becomes ready.
@ -218,17 +225,17 @@ where
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Key: Hash + Clone,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<error::Error>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = <D::Service as Service<Req>>::Response;
type Error = error::Error;
type Error = crate::BoxError;
type Future = future::MapErr<
<D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> error::Error,
fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {

View File

@ -1,8 +1,8 @@
use crate::discover::ServiceList;
use crate::load;
use futures_util::pin_mut;
use std::task::Poll;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_discover::ServiceList;
use tower_load as load;
use tower_test::{assert_request_eq, mock};
use super::*;
@ -11,7 +11,7 @@ use super::*;
async fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = mock::Spawn::new(Balance::from_entropy(disco));
let mut svc = mock::Spawn::new(Balance::new(disco));
assert_pending!(svc.poll_ready());
}
@ -20,7 +20,7 @@ async fn single_endpoint() {
let (mut svc, mut handle) = mock::spawn_with(|s| {
let mock = load::Constant::new(s, 0);
let disco = ServiceList::new(vec![mock].into_iter());
Balance::from_entropy(disco)
Balance::new(disco)
});
handle.allow(0);
@ -61,7 +61,7 @@ async fn two_endpoints_with_equal_load() {
pin_mut!(handle_b);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = mock::Spawn::new(Balance::from_entropy(disco));
let mut svc = mock::Spawn::new(Balance::new(disco));
handle_a.allow(0);
handle_b.allow(0);

View File

@ -15,8 +15,10 @@
#![deny(missing_docs)]
use super::p2c::Balance;
use crate::error;
use futures_core::ready;
use crate::discover::Change;
use crate::load::Load;
use crate::make::MakeService;
use futures_core::{ready, Stream};
use pin_project::pin_project;
use slab::Slab;
use std::{
@ -25,9 +27,6 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_make::MakeService;
use tower_service::Service;
#[cfg(test)]
@ -79,21 +78,16 @@ where
}
}
impl<MS, Target, Request> Discover for PoolDiscoverer<MS, Target, Request>
impl<MS, Target, Request> Stream for PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone,
{
type Key = usize;
type Service = DropNotifyService<MS::Service>;
type Error = MS::MakeError;
type Item = Result<Change<usize, DropNotifyService<MS::Service>>, MS::MakeError>;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) {
@ -148,7 +142,7 @@ where
message = "finished creating new service"
);
*this.load = Level::Normal;
return Poll::Ready(Ok(Change::Insert(id, svc)));
return Poll::Ready(Some(Ok(Change::Insert(id, svc))));
}
match this.load {
@ -167,7 +161,7 @@ where
pool.services = this.services.len(),
message = "removing service for over-provisioned pool"
);
Poll::Ready(Ok(Change::Remove(rm)))
Poll::Ready(Some(Ok(Change::Remove(rm))))
}
}
}
@ -278,8 +272,8 @@ impl Builder {
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone,
{
let (died_tx, died_rx) = tokio::sync::mpsc::unbounded_channel();
@ -295,7 +289,7 @@ impl Builder {
};
Pool {
balance: Balance::from_entropy(Box::pin(d)),
balance: Balance::new(Box::pin(d)),
options: *self,
ewma: self.init,
}
@ -306,8 +300,8 @@ impl Builder {
pub struct Pool<MS, Target, Request>
where
MS: MakeService<Target, Request>,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone,
{
// the Pin<Box<_>> here is needed since Balance requires the Service to be Unpin
@ -319,8 +313,8 @@ where
impl<MS, Target, Request> fmt::Debug for Pool<MS, Target, Request>
where
MS: MakeService<Target, Request> + fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone + fmt::Debug,
MS::Service: fmt::Debug,
Request: fmt::Debug,
@ -339,8 +333,8 @@ where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone,
{
/// Construct a new dynamically sized `Pool`.
@ -361,8 +355,8 @@ where
MS: MakeService<Target, Req>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<error::Error>,
MS::Error: Into<error::Error>,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone,
{
type Response = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;

View File

@ -1,6 +1,6 @@
use crate::load;
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_load as load;
use tower_test::{assert_request_eq, mock};
use super::*;

Some files were not shown because too many files have changed in this diff Show More