Compare commits

...

92 Commits

Author SHA1 Message Date
Taiki Endo b12a3e3ae9
Remove uses of pin_project::project attribute (#458)
pin-project will deprecate the project attribute due to some unfixable
limitations.

Refs: https://github.com/taiki-e/pin-project/issues/225
2020-06-15 12:38:34 -04:00
Lucio Franco 007b648ea9
Clean up readme and update status (#453) 2020-05-08 13:54:45 -04:00
Bruce Guenter 98e0e41db1
Rework ConcurrencyLimit to use upstream tokio Semaphore (#451) 2020-05-06 11:06:40 -04:00
Lucio Franco a0a66b10a2
Upgrade cargo deny action (#452) 2020-05-06 09:45:57 -04:00
Jon Gjengset 1c2d50680a
Spring cleaning for tower::balance (#449)
Noteworthy changes:

 - All constructors now follow the same pattern: `new` uses OS entropy,
   `from_rng` takes a `R: Rng` and seeds the randomness from there.
   `from_rng` is fallible, since randomness generators can be fallible.
 - `BalanceLayer` was renamed to `MakeBalanceLayer`, since it is not
   _really_ a `BalanceLayer`. The name of `BalanceMake` was also
   "normalized" to `MakeBalance`.

Another observation: the `Debug` bound on `Load::Metric` in
`p2c::Balance`, while not particularly onerous, generates really
confusing errors if you forget it include it. And crucially, the error
never points at `Debug` (should we file a compiler issue?), so I pretty
much had to guess my way to that being wrong in the doc example.

It would probably be useful to add a documentation example to
`MakeBalanceLayer` or `MakeBalance` (I suspect just one of them is fine,
since they're basically the same). Since I've never used it, and find it
hard to think of uses for it, it might be good if someone with more
experience with it wrote one.
2020-04-24 13:21:11 -04:00
Jon Gjengset 6a25d322b5 Use only one alias for Box<dyn Error>
This was a mostly mechanical change. I think in at least one place it
results in a `'static` bound being added, but the next tower release
will be breaking anyway, so that's okay.

I think it helps to also document the alias at the top to (eventually)
explain how people can interact with the error they get back to discover
the "deeper cause".
2020-04-24 10:30:20 -04:00
Eliza Weisman 8752a38117
util: fix oneshot dropping pending services immediately (#447)
## Motivation

Commit #330 introduced a regression when porting `tower-util::Oneshot`
from `futures` 0.1 to `std::future`. The *intended* behavior is that a
oneshot future should repeatedly call `poll_ready` on the oneshotted
service until it is ready, and then call the service and drive the
returned future. However, #330 inadvertently changed the oneshot future
to poll the service _once_, call it if it is ready, and then drop it,
regardless of its readiness.

In the #330 version of oneshot, an `Option` is used to store the
request while waiting for the service to become ready, so that it can be
`take`n and moved into the service's `call`. However, the `Option`
contains both the request _and_ the service itself, and is taken the
first time the service is polled. `futures::ready!` is then used when
polling the service, so the method returns immediate if it is not ready.
This means that the service itself (and the request), which were taken
out of the `Option`, will be dropped, and if the oneshot future is
polled again, it will panic.

## Solution

This commit changes the `Oneshot` future so that only the request lives
in the `Option`, and it is only taken when the service is called, rather
than every time it is polled. This fixes the bug.

I've also added a test for this which fails against master, but passes
after this change.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
2020-04-23 16:07:48 -07:00
Steven Fackler 82e578b5b0
Impl Layer for &Layer (#446) 2020-04-21 17:11:27 -04:00
Jon Gjengset 39112cb0ba
Tidy up tower::load (#445)
This also renames the `Instrument` trait, and related types, to better
reflect what they do. Specifically, the trait is now called
`TrackCompletion`, and `NoInstrument` is called `CompleteOnResponse`.

Also brings back balance example and makes it compile.
2020-04-20 14:55:40 -04:00
Jon Gjengset 05b165056b
Tidy up tower::buffer (#444) 2020-04-17 17:41:51 -04:00
Jon Gjengset c87fdd9c1e
Change Discover to be a sealed trait (#443)
* Change Discover to be a sealed trait

`Discover` was _really_ just a `TryStream<Item = Change>`, so this
change makes that much clearer. Specifically, users are intended to use
`Discover` only in bounds, whereas implementors should implement
`Stream` with the appropriate `Item` type. `Discover` then comes with a
blanket implementation for anything that implements `TryStream`
appropriately. This obviates the need for the `discover::stream` module.
2020-04-17 16:27:44 -04:00
Jon Gjengset 5947e2e145
Some more spring clean fixes. (#442)
* Add doc feature annotations

* Modules should be published or removed
2020-04-17 16:03:15 -04:00
Lucio Franco 85b657bf93
Remove path deps for `tower-service` (#441) 2020-04-17 14:00:38 -04:00
Lucio Franco 5e1788f494
rate: Fix rate limit not resetting (#439) 2020-04-16 11:31:58 -04:00
Lucio Franco cd7dd12315
Refactor github actions (#436)
Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
2020-04-14 19:20:20 -04:00
Lucio Franco 8a73440c1a
reconnect: Rework to allow real reconnecting (#437)
Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
Co-authored-by: Jon Gjengset <jon@thesquareplanet.com>
2020-04-14 16:42:37 -04:00
Lucio Franco d34019045f
Add `Map` service combinator (#435)
Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
Co-authored-by: David Barsky <dbarsky@amazon.com>
2020-04-14 15:16:16 -04:00
Akshay Narayan 0520a6a467
New sub-crate: tower-steer (#426) 2020-03-31 21:26:13 -04:00
Jon Gjengset 81cfbab19e
Merge pull request #432 from tower-rs/2020-spring-clean
2020: merge all the middleware
2020-03-31 16:55:48 -04:00
Jon Gjengset 9dd2314048
step 4: make features do the right thing 2020-03-31 16:26:53 -04:00
Jon Gjengset 2e06782241
step 3: make ci work again 2020-03-31 16:26:52 -04:00
Jon Gjengset c4d70b535b
step 2: make all the tests work again 2020-03-31 16:12:32 -04:00
Jon Gjengset 8df2a3e410
step 1: move all things to where they're going
Note that this also moves all crates from `log` to `tracing`.
It also does not set any dependencies as optional.
2020-03-31 13:31:21 -04:00
Jon Gjengset 0f9eb648a5
limit: prepare 0.3.1 release (#430) 2020-03-25 19:51:59 -04:00
Jon Gjengset 378433fc75
limit: Forward tower_load::Load (#429) 2020-03-25 19:46:05 -04:00
Jon Gjengset b575175210
util: prepare 0.3.1 release (#428) 2020-03-23 13:02:43 -04:00
Jon Gjengset 52fde9767c
util: Add ReadyAnd to do what Ready should do (#427)
* util: Add ReadyAnd to do what Ready should do

`ServiceExt::ready` says that it produces "A future yielding the service
when it is ready to accept a request." This is not true; it does _not_
yield the service when it is ready, it yields unit. This makes it
impossible to chain service ready with service call, which is sad.

This PR adds `ready_and`, which does what `ready` promised. It also
deprecates `ready` with the intention that we remove `ready` in a future
version, and make the strictly more general `ready_and` take its place.
We can't do it now since it's not a backwards-compatible change even
though it _probably_ wouldn't break any code.

The PR also updates the docs so that they reflect the observed behavior.
2020-03-23 12:49:44 -04:00
Jon Gjengset b6f5f586c5
Add Buffer::new note on how to set bound (#425) 2020-03-04 15:48:33 -05:00
Jake Ham 52d9e95a38
Fix documentation links in README (#422)
Updated the README, fixing the links to documentation. This now links
to each packages documentation on docs.rs. Not all packages have been
released to crates.io, so their documentation pages are empty.
2020-02-27 11:42:08 -05:00
Jon Gjengset ba1fdd755b ready-cache: Prepare for 0.3.1 release
This also fixes up the various documentation URLs, which were still
pointing to 0.1.x.
2020-02-24 13:14:23 -05:00
Jon Gjengset 414e3b0809
ready-cache: Avoid panic on strange race (#420)
It's been observed that occasionally tower-ready-cache would panic
trying to find an already canceled service in `cancel_pending_txs`
(#415). The source of the race is not entirely clear, but extensive
debugging demonstrated that occasionally a call to `evict` would send on
the `CancelTx` for a service, yet that service would be yielded back
from `pending` in `poll_pending` in a non-`Canceled` state. This
is equivalent to saying that this code may panic:

```rust
async {
  let (tx, rx) = oneshot::channel();
  tx.send(42).unwrap();
  yield_once().await;
  rx.try_recv().unwrap(); // <- may occasionally panic
}
```

I have not been able to demonstrate a self-contained example failing in
this way, but it's the only explanation I have found for the observed
bug. Pinning the entire runtime to one core still produced the bug,
which indicates that it is not a memory ordering issue. Replacing
oneshot with `mpsc::channel(1)` still produced the bug, which indicates
that the bug is not with the implementation of `oneshot`. Logs also
indicate that the `ChannelTx` we send on in `evict()` truly is the same
one associated with the `ChannelRx` polled in `Pending::poll`, so we're
not getting our wires crossed somewhere. It truly is bizarre.

This patch resolves the issue by considering a failure to find a
ready/errored service's `CancelTx` as another signal that a service has
been removed. Specifically, if `poll_pending` finds a service that
returns `Ok` or `Err`, but does _not_ find its `CancelTx`, then it
assumes that it must be because the service _was_ canceled, but did not
observe that cancellation signal.

As an explanation, this isn't entirely satisfactory, since we do not
fully understand the underlying problem. It _may_ be that a canceled
service could remain in the pending state for a very long time if it
does not become ready _and_ does not see the cancellation signal (so it
returns `Poll::Pending` and is not removed). That, in turn, might cause
an issue if the driver of the `ReadyCache` then chooses to re-use a key
they believe they have evicted. However, any such case _must_ first hit
the panic that exists in the code today, so this is still an improvement
over the status quo.

Fixes #415.
2020-02-24 13:03:43 -05:00
Jon Gjengset be156e733d ready-cache: restore assert for dropped cancel tx
When ready-cache was upgraded from futures 0.1 to `std::future` in
e2f1a49cf3, this `expect` was removed, and
the code instead silently ignores the error. That's probably not what we
want, so this patch restores that assertion.
2020-02-20 17:08:07 -05:00
Jon Gjengset 1a67100aab Restore commented-out p2c assertion 2020-02-20 16:33:54 -05:00
Jon Gjengset ae34c9b4a1 Add more tower-ready-cache tests 2020-02-20 16:33:54 -05:00
Jon Gjengset 96529148d8 Remove irrelevant comment
The assertion there isn't even true anyway, since the p2c may not yet
have "seen" the removal of a service, because it stopped when it found a
ready service.
2020-02-20 16:01:19 -05:00
Jon Gjengset 650e5be58e balance: Add a stress test for p2c
The hope for this was to reproduce #415 (which it does not sadly), but
at least it adds a test for p2c!
2020-02-20 16:01:19 -05:00
Jon Gjengset 47c3a14560 tower: Prepare 0.3.1 release 2020-01-17 22:53:08 -05:00
Jon Gjengset ccfe7da592 tower: Allow opting out of tracing/log
This is of particular importance since the `log` feature of `tracing`
(currently) fails to compile if the `tracing` dependency is renamed.
Without a way to disable it in `tower`, any package that both depends on
`tower` **and** renames `tracing` in its dependencies is doomed.
2020-01-17 17:01:43 -05:00
Lucio Franco 7e35b758be
Remove azure and rename gh actions (#409)
* Remove azure

* Rename actions

* Rename workflow

* Reduce amount of actions

* Fix patch
2020-01-09 19:23:03 -05:00
László Nagy 40103d84ce Use GitHub actions (#407)
* gh-403: add basic github actions

* gh-403: add environment variables during test

* gh-403: fix error in tower-balance example

* gh-403: rename build workflow

* gh-403: fix release workflow

* gh-403: add GitHub page publish workflow

* gh-403: remove release workflow

* gh-403: run per crate build

* gh-403: replace build to check
2020-01-09 19:02:40 -05:00
Lucio Franco 7b48479bd2
util: Remove dev dep on tower (#401)
* util: Remove dev dep on tower

* Fix rustc bug

* enable call-all by default
2019-12-19 18:17:21 -05:00
Lucio Franco d63665515c
ready-cache: Add readme (#402) 2019-12-19 17:56:43 -05:00
Lucio Franco fe7919b1a4
Use `Into<Error>` for all Services (#400) 2019-12-19 17:30:23 -05:00
Lucio Franco 86eef82d2f
Remove default features for futures dep (#399)
* Remove default features for futures dep

* Add missing alloc feature
2019-12-19 14:20:41 -05:00
Lucio Franco 1e87d7ca8b
Bump crates and changelog dates (#397) 2019-12-19 13:44:40 -05:00
Lucio Franco 2fede40bdb
balance: Upgrade rand to 0.7 (#398) 2019-12-19 13:44:07 -05:00
Sean McArthur 2dc9a72bea tower-util: remove dead code 2019-12-11 13:13:07 -08:00
Sean McArthur 1863304331 move ServiceExt to tower-util crate 2019-12-11 12:13:51 -08:00
Lucio Franco 2e9e2d1813
limit: Vendor `tokio::sync::Semaphore` (#388) 2019-12-11 15:08:42 -05:00
Lucio Franco fd2d034e97
ci: Re-enable ci (#389)
* ci: Re-enable ci

* ci: Re-enable ci

* Actually use stable
2019-12-11 15:01:02 -05:00
Sean McArthur f6650b90c7 re-enable CI for tower-layer and tower-util 2019-12-11 11:25:13 -08:00
Sean McArthur f130e5e113 tower-util: reduce dependencies, make call-all optional 2019-12-11 11:25:13 -08:00
Juan Alvarez 1843416dfe remove service, make and layer path deps (#382) 2019-12-06 11:59:56 -05:00
Lucio Franco 423ecee7e9
Remove unused deps (#381) 2019-12-05 23:42:01 -05:00
Lucio Franco fdc7460f5a
Add rt-core feature to buffer (#380) 2019-12-05 20:17:36 -05:00
Lucio Franco e2f1a49cf3
Update the rest of the crates and upgrade ready cache to `std::f… (#379)
* Update hedge, filter, load, load-shed, and more

* Update ready cache

* Prepare release for ready-cache

* fix merge

* Update balance

* Prepare balance release
2019-12-05 14:21:47 -05:00
Lucio Franco 0d2a3778ad
Update `tower` and `tower-util` and prep for release (#378)
* Update tower and tower-util

* Prepare them for release

* fmt

* Get tower tests working
2019-12-04 22:48:43 -05:00
Lucio Franco 54dd475ec0
Update buffer and prepare for release (#377)
* Update buffer and prepare for release

* Update tower-buffer/src/service.rs

Co-Authored-By: Eliza Weisman <eliza@buoyant.io>

* fmt
2019-12-04 20:31:27 -05:00
Lucio Franco 15c58e8842
Update retry and prepare for release (#376)
* Update retry and prepare for release

* fmt
2019-12-04 19:36:46 -05:00
Lucio Franco 877c194b1b
Update tower-limit and prepare for release (#375)
* wip

* Refactor limit tests and prep for release
2019-12-04 09:53:52 -05:00
Lucio Franco ec6215fb2f
Update timeout, tower-test and reconnect (#373)
* Update timeout and prepare 0.3

* Update tower-test and prepare release

* Update lib doc path

* Update reconnect and prepare for release
2019-12-02 19:14:15 -05:00
David Barsky 45e311c2f2 layer: Prepare 0.3.0 Release (#372)
* layer: prepare 0.3.0 release

* fmt

* Update tower-layer/src/lib.rs
2019-11-29 16:09:47 -05:00
Lucio Franco b6c67182cb
make: Prepare 0.3 release and update docs (#370)
* make: Prepare 0.3 release and update docs

* rebase against origin/master + get doc tests to compile

* fmt

* fix build
2019-11-29 15:44:03 -05:00
Lucio Franco c3c6780d31
service: Update docs and prepare for 0.3 release (#369)
* service: Update docs and prepare for 0.3 release

* Update rustmft

* Disable main tower crate
2019-11-29 11:48:08 -05:00
Lucio Franco a4cb384751
Remove v0.3.x branch note on readme (#368) 2019-11-29 11:19:15 -05:00
Lucio Franco bb5c02ca58
Disable all crates execpt tower-service 2019-11-29 09:23:54 -05:00
Lucio Franco a62fe875c4
Disable tower-balance from ci 2019-11-29 09:15:10 -05:00
David Barsky a4c02f5d9c Revert "get building"
186a0fb4a3
2019-11-28 15:21:27 -05:00
David Barsky 186a0fb4a3 get building 2019-11-28 15:15:41 -05:00
Lucio Franco 51a374c564 Fix up last few merge issues 2019-11-26 10:32:49 -05:00
Lucio Franco 87ad2e1cc8 Merge remote-tracking branch 'origin/master' into v0.3.x 2019-11-26 10:32:02 -05:00
Oliver Gould 7e55b7fa0b
Introduce tower-ready-cache (#303)
In #293, `balance` was refactored to manage dispatching requests over a
set of equivalent inner services that may or may not be ready.

This change extracts the core logic of managing a cache of ready
services into a dedicated crate, leaving the balance crate to deal with
node selection.
2019-11-12 09:44:16 -08:00
Oliver Gould 2d24d84e7c
Cleanup unused dependencies (#364)
I've run `cargo udeps` to discover some unused/misplaced dependencies.
2019-11-11 09:52:33 -08:00
Oliver Gould 4a4593d522
balance: Update rand to 0.7 (#363) 2019-11-09 14:30:44 -08:00
Pen Tree 52dbdda23d Expect the poll_acquire error, not return (#362)
* Expect the poll_acquire error, not return

* Remove Error in tower-limit
2019-10-31 14:06:04 -04:00
Pen Tree fac5c361a4 Fix tower-service docs (#361) 2019-10-18 17:18:55 -04:00
Lucio Franco e414b2b7d3
Prepare buffer 0.1.2 release (#360) 2019-10-11 11:39:34 -04:00
Lucio Franco 30f11bfaa2
Prepare limit 0.1.1 release (#359) 2019-10-11 11:22:14 -04:00
Lucio Franco abe5b78542
Remove tokio alpha.6 patches (#357)
* Remove tokio alpha.6 patches

* Remove ci patch
2019-09-30 21:15:26 -04:00
Lucio Franco 3bff86e28e
make: Add alpha.2a changelog 2019-09-30 20:53:39 -04:00
Lucio Franco 7fa1054892
make: Bump version to alpha.2a (#356) 2019-09-30 20:40:28 -04:00
Jon Gjengset 2653f70884 Bumps for 0.3.0-alpha.2 (#355)
* Bump all to futures-* alpha.19

* Prepare for alpha.2 release

* Make tower-service also a path dep

* Use new tokio alpha
2019-09-30 18:56:26 -04:00
Taiki Endo 03dc7069aa Update pin-project to 0.4 (#350) 2019-09-30 14:58:27 -04:00
Jon Gjengset d5b36b54a5
Re-enable all CI (#353)
CI has to run on nightly for the time being.

Also includes changes to make buffer tests more reliable.
2019-09-24 18:56:37 -04:00
Jon Gjengset 6baf381879
Consistently apply deny/warn rules (#352)
This makes all tower subcrates have the following lints as warn (rather
than allow): `missing_docs`, `rust_2018_idioms`, `unreachable_pub`, and
`missing_debug_implementations`. In addition, it consistently applies
`deny(warning)` *only* under CI so that deprecations and macro changes in minor
version bumps in dependencies will never cause `tower` crates to stop
compiling, and so that tests can be run even if not all warnings have been
dealt with. See also https://github.com/rust-unofficial/patterns/blob/master/anti_patterns/deny-warnings.md

Note that `tower-reconnect` has the `missing_docs` lint disabled for now
since it contained _no_ documentation previously. Also note that this
patch does not add documentation to the various `new` methods, as they
are considered self-explanatory. They are instead marked as
`#[allow(missing_docs)]`.
2019-09-23 17:28:14 -04:00
Taiki Endo 5a561b7776 layer: remove unused dependencies (#351) 2019-09-23 09:54:08 -04:00
Sean McArthur 55b5150a89 tower-make:v0.3.0-alpha.2 2019-09-20 15:09:09 -07:00
Sean McArthur 52075f3c6f Update tower-make to tokio-io v0.2.0-alpha.5 2019-09-20 15:09:09 -07:00
Luke Steensen b86d7fb6e4 limit: Add trace log when rate limit is exceeded (#348) 2019-09-17 17:19:23 -04:00
Luke Steensen 8509ab879d Fix up broken dependencies and deprecated methods (#347)
* fix up broken dependencies and deprecated methods

* use released version of tracing-subscriber

Co-Authored-By: Lucio Franco <luciofranco14@gmail.com>
2019-09-17 15:29:11 -04:00
Mackenzie Clark f4a81d2c7d fix tower-service helloworld docs example to use new futures (#346) 2019-09-15 14:09:58 -05:00
Lucio Franco 793e2e8e94
Add a note about v0.3.x branch to the readme (#312)
* Add a note about v0.3.x branch to the readme

Signed-off-by: Lucio Franco <luciofranco14@gmail.com>

* Fix link

Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
2019-08-20 23:20:01 -04:00
236 changed files with 5509 additions and 5282 deletions

131
.github/workflows/CI.yml vendored Normal file
View File

@ -0,0 +1,131 @@
name: CI
on:
push:
branches:
- master
pull_request: {}
jobs:
check:
# Run `cargo check` first to ensure that the pushed code at least compiles.
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable, 1.40.0]
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
- name: Check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --all-targets --all-features
cache-cargo-hack:
runs-on: ubuntu-latest
steps:
- name: Fetch latest release version of cargo-hack
run: |
mkdir -p .github/caching
curl -sL https://api.github.com/repos/taiki-e/cargo-hack/releases/latest | jq -r '.name' > .github/caching/cargo-hack.lock
- name: Cache cargo-hack/bin
id: cache-cargo-hack
uses: actions/cache@v1
with:
path: ${{ runner.tool_cache }}/cargo-hack/bin
key: cargo-hack-bin-${{ hashFiles('.github/caching/cargo-hack.lock') }}
- name: Install cargo-hack
if: "steps.cache-cargo-hack.outputs.cache-hit != 'true'"
uses: actions-rs/cargo@v1
with:
command: install
args: --root ${{ runner.tool_cache }}/cargo-hack --force cargo-hack
cargo-hack:
needs: cache-cargo-hack
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
- name: Fetch latest release version of cargo-hack
run: |
mkdir -p .github/caching
curl -sL https://api.github.com/repos/taiki-e/cargo-hack/releases/latest | jq -r '.name' > .github/caching/cargo-hack.lock
- name: Restore cargo-hack/bin
uses: actions/cache@v1
with:
path: ${{ runner.tool_cache }}/cargo-hack/bin
key: cargo-hack-bin-${{ hashFiles('.github/caching/cargo-hack.lock') }}
- run: echo "::add-path::${{ runner.tool_cache }}/cargo-hack/bin"
# if `cargo-hack` somehow doesn't exist after loading it from the cache,
# make *sure* it's there.
- run: cargo hack --help || { cargo install --force cargo-hack; }
- name: cargo hack check
working-directory: ${{ matrix.subcrate }}
run: cargo hack check --each-feature --no-dev-deps --all
test-versions:
# Test against the stable, beta, and nightly Rust toolchains on ubuntu-latest.
needs: check
runs-on: ubuntu-latest
strategy:
matrix:
rust: [stable, beta, nightly, 1.40.0]
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --all --all-features
style:
# Check style.
needs: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt
profile: minimal
- name: rustfmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
# warnings:
# # Check for any warnings. This is informational and thus is allowed to fail.
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@master
# - uses: actions-rs/toolchain@v1
# with:
# toolchain: stable
# components: clippy
# profile: minimal
# - name: Clippy
# uses: actions-rs/clippy-check@v1
# with:
# token: ${{ secrets.GITHUB_TOKEN }}
# args: --all --all-targets --all-features -- -D warnings
deny-check:
name: cargo-deny check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: EmbarkStudios/cargo-deny-action@v1

7
.github/workflows/patch.toml vendored Normal file
View File

@ -0,0 +1,7 @@
# Patch dependencies to run all tests against versions of the crate in the
# repository.
[patch.crates-io]
tower = { path = "tower" }
tower-layer = { path = "tower-layer" }
tower-service = { path = "tower-service" }
tower-test = { path = "tower-test" }

29
.github/workflows/publish.yml vendored Normal file
View File

@ -0,0 +1,29 @@
name: Deploy API Documentation
on:
push:
branches:
- master
jobs:
publish:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v1
- name: Generate documentation
uses: actions-rs/cargo@v1
with:
command: doc
args: --workspace --no-deps
- name: Deploy documentation
if: success()
uses: crazy-max/ghaction-github-pages@v1
with:
target_branch: gh-pages
build_dir: target/doc
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@ -2,21 +2,7 @@
members = [
"tower",
"tower-balance",
"tower-buffer",
"tower-discover",
"tower-filter",
"tower-hedge",
"tower-layer",
"tower-limit",
"tower-load",
"tower-load-shed",
"tower-reconnect",
"tower-retry",
"tower-service",
"tower-spawn-ready",
"tower-test",
"tower-timeout",
"tower-make",
"tower-util",
]

View File

@ -17,52 +17,11 @@ Tower aims to make it as easy as possible to build robust networking clients and
servers. It is protocol agnostic, but is designed around a request / response
pattern. If your protocol is entirely stream based, Tower may not be a good fit.
## Project Layout
Tower consists of a number of components, each of which live in their own sub
crates.
* [`tower`]: The main user facing crate that provides batteries included tower services ([docs][t-docs]).
* [`tower-service`]: The foundational traits upon which Tower is built
([docs][ts-docs]).
* [`tower-layer`]: The foundational trait to compose services together
([docs][tl-docs]).
* [`tower-balance`]: A load balancer. Load is balanced across a number of
services ([docs][tb-docs]).
* [`tower-buffer`]: A buffering middleware. If the inner service is not ready to
handle the next request, `tower-buffer` stores the request in an internal
queue ([docs][tbuf-docs]).
* [`tower-discover`]: Service discovery abstraction ([docs][td-docs]).
* [`tower-filter`]: Middleware that conditionally dispatch requests to the inner
service based on a predicate ([docs][tf-docs]).
* [`tower-limit`]: Middleware limiting the number of requests that are
processed ([docs][tlim-docs]).
* [`tower-reconnect`]: Middleware that automatically reconnects the inner
service when it becomes degraded ([docs][tre-docs]).
* [`tower-retry`]: Middleware that retries requests based on a given `Policy`
([docs][tretry-docs]).
* [`tower-test`]: Testing utilies ([docs][ttst-docs]).
* [`tower-timeout`]: Middleware that applies a timeout to requests
([docs][tt-docs]).
* [`tower-util`]: Miscellaneous additional utilities for Tower
([docs][tu-docs]).
## Status
Currently, only [`tower-service`], the foundational trait, has been released to
crates.io. The rest of the library will be following shortly.
Currently, `tower 0.3` is released on crates. We are currently working on cleaning
up the codebase and adding more documentation. You can follow our progress in
this [issue](https://github.com/tower-rs/tower/issues/431).
## License
@ -73,31 +32,3 @@ This project is licensed under the [MIT license](LICENSE).
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.
[`tower`]: tower
[t-docs]: https://tower-rs.github.io/tower/doc/tower/index.html
[`tower-service`]: tower-service
[ts-docs]: https://docs.rs/tower-service/
[`tower-layer`]: tower-layer
[tl-docs]: https://docs.rs/tower-layer/
[`tower-balance`]: tower-balance
[tb-docs]: https://tower-rs.github.io/tower/doc/tower_balance/index.html
[`tower-buffer`]: tower-buffer
[tbuf-docs]: https://tower-rs.github.io/tower/doc/tower_buffer/index.html
[`tower-discover`]: tower-discover
[td-docs]: https://tower-rs.github.io/tower/doc/tower_discover/index.html
[`tower-filter`]: tower-filter
[tf-docs]: https://tower-rs.github.io/tower/doc/tower_filter/index.html
[`tower-limit`]: tower-limit
[tlim-docs]: https://tower-rs.github.io/tower/doc/tower_limit/index.html
[`tower-reconnect`]: tower-reconnect
[tre-docs]: https://tower-rs.github.io/tower/doc/tower_reconnect/index.html
[`tower-retry`]: tower-retry
[tretry-docs]: https://tower-rs.github.io/tower/doc/tower_retry/index.html
[`tower-timeout`]: tower-timeout
[`tower-test`]: tower-test
[ttst-docs]: https://tower-rs.github.io/tower/doc/tower_test/index.html
[`tower-rate-limit`]: tower-rate-limit
[tt-docs]: https://tower-rs.github.io/tower/doc/tower_timeout/index.html
[`tower-util`]: tower-util
[tu-docs]: https://tower-rs.github.io/tower/doc/tower_util/index.html

View File

@ -1,38 +0,0 @@
trigger: ["master", "v0.3.x"]
pr: ["master", "v0.3.x"]
jobs:
- template: ci/azure-rustfmt.yml
parameters:
name: rustfmt
# Basic test run on all platforms
- template: ci/azure-test-stable.yml
parameters:
name: Linux_Stable
displayName: Test
vmImage: ubuntu-16.04
crates:
# - tower-balance
# - tower-buffer
# - tower-discover
# - tower-filter
# - tower-hedge
# - tower-layer
# - tower-limit
# - tower-load
# - tower-load-shed
# - tower-reconnect
# - tower-retry
- tower-service
# - tower-spawn-ready
# - tower-test
# - tower-timeout
# - tower-util
# - tower
- template: ci/azure-deploy-docs.yml
parameters:
dependsOn:
- rustfmt
- Linux_Stable

View File

@ -1,39 +0,0 @@
parameters:
dependsOn: []
jobs:
- job: documentation
displayName: 'Deploy API Documentation'
condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/master'))
pool:
vmImage: 'Ubuntu 16.04'
dependsOn:
- ${{ parameters.dependsOn }}
steps:
- template: azure-install-rust.yml
parameters:
platform: ${{parameters.name}}
rust_version: stable
- script: |
cargo doc --all --no-deps
cp -R target/doc '$(Build.BinariesDirectory)'
displayName: 'Generate Documentation'
- script: |
set -e
git --version
ls -la
git init
git config user.name 'Deployment Bot (from Azure Pipelines)'
git config user.email 'deploy@tower-rs.com'
git config --global credential.helper 'store --file ~/.my-credentials'
printf "protocol=https\nhost=github.com\nusername=carllerche\npassword=%s\n\n" "$GITHUB_TOKEN" | git credential-store --file ~/.my-credentials store
git remote add origin https://github.com/tower-rs/tower
git checkout -b gh-pages
git add .
git commit -m 'Deploy Tower API documentation'
git push -f origin gh-pages
env:
GITHUB_TOKEN: $(githubPersonalToken)
workingDirectory: '$(Build.BinariesDirectory)'
displayName: 'Deploy Documentation'

View File

@ -1,28 +0,0 @@
steps:
# Linux and macOS.
- script: |
set -e
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUSTUP_TOOLCHAIN
echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin"
env:
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}
displayName: "Install rust (*nix)"
condition: not(eq(variables['Agent.OS'], 'Windows_NT'))
# Windows.
- script: |
echo "windows"
curl -sSf -o rustup-init.exe https://win.rustup.rs
rustup-init.exe -y --default-toolchain %RUSTUP_TOOLCHAIN%
set PATH=%PATH%;%USERPROFILE%\.cargo\bin
echo "##vso[task.setvariable variable=PATH;]%PATH%;%USERPROFILE%\.cargo\bin"
env:
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}
displayName: Install rust (windows)
condition: eq(variables['Agent.OS'], 'Windows_NT')
# All platforms.
- script: |
rustc -Vv
cargo -V
displayName: Query rust and cargo versions

View File

@ -1,9 +0,0 @@
steps:
- bash: |
set -e
if git log --no-merges -1 --format='%B' | grep -qF '[ci-release]'; then
echo "##vso[task.setvariable variable=isRelease]true"
fi
failOnStderr: true
displayName: Check if release commit

View File

@ -1,17 +0,0 @@
steps:
- script: |
set -e
# Remove any existing patch statements
mv Cargo.toml Cargo.toml.bck
sed -n '/\[patch.crates-io\]/q;p' Cargo.toml.bck > Cargo.toml
# Patch all crates
cat ci/patch.toml >> Cargo.toml
# Print `Cargo.toml` for debugging
echo "~~~~ Cargo.toml ~~~~"
cat Cargo.toml
echo "~~~~~~~~~~~~~~~~~~~~"
displayName: Patch Cargo.toml

View File

@ -1,16 +0,0 @@
jobs:
# Check formatting
- job: ${{ parameters.name }}
displayName: Check rustfmt
pool:
vmImage: ubuntu-16.04
steps:
- template: azure-install-rust.yml
parameters:
rust_version: stable
- bash: |
rustup component add rustfmt
displayName: Install rustfmt
- bash: |
cargo fmt --all -- --check
displayName: Check formatting

View File

@ -1,31 +0,0 @@
parameters:
crates: []
jobs:
- job: ${{ parameters.name }}
displayName: ${{ parameters.displayName }}
pool:
vmImage: ${{ parameters.vmImage }}
steps:
- template: azure-install-rust.yml
parameters:
rust_version: stable
- template: azure-is-release.yml
- ${{ each crate in parameters.crates }}:
- script: cargo test
env:
CI: 'True'
displayName: cargo test -p ${{ crate }}
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}
condition: and(succeeded(), ne(variables['isRelease'], 'true'))
- template: azure-patch-crates.yml
- ${{ each crate in parameters.crates }}:
- script: cargo test
env:
CI: 'True'
displayName: cargo test -p ${{ crate }}
workingDirectory: $(Build.SourcesDirectory)/${{ crate }}

View File

@ -1,18 +0,0 @@
# Patch dependencies to run all tests against versions of the crate in the
# repository.
[patch.crates-io]
tower = { path = "tower" }
tower-balance = { path = "tower-balance" }
tower-buffer = { path = "tower-buffer" }
tower-discover = { path = "tower-discover" }
tower-filter = { path = "tower-filter" }
tower-layer = { path = "tower-layer" }
tower-limit = { path = "tower-limit" }
tower-load-shed = { path = "tower-load-shed" }
tower-reconnect = { path = "tower-reconnect" }
tower-retry = { path = "tower-retry" }
tower-service = { path = "tower-service" }
tower-spawn-ready = { path = "tower-spawn-ready" }
tower-test = { path = "tower-test" }
tower-timeout = { path = "tower-timeout" }
tower-util = { path = "tower-util" }

25
deny.toml Normal file
View File

@ -0,0 +1,25 @@
[advisories]
vulnerability = "deny"
unmaintained = "warn"
notice = "warn"
ignore = []
[licenses]
unlicensed = "deny"
allow = []
deny = []
copyleft = "warn"
allow-osi-fsf-free = "either"
confidence-threshold = 0.8
[bans]
multiple-versions = "deny"
highlight = "all"
skip-tree = [
{ name = "tower", version = "=0.3"}
]
[sources]
unknown-registry = "warn"
unknown-git = "warn"
allow-git = []

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,54 +0,0 @@
[package]
name = "tower-balance"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-balance/0.3.0-alpha.1"
description = """
Balance load across a set of uniform services.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
log = ["tracing/log"]
default = ["log"]
[dependencies]
futures-util-preview = "=0.3.0-alpha.18"
futures-core-preview = "=0.3.0-alpha.18"
pin-project = "=0.4.0-alpha.11"
indexmap = "1.0.2"
tracing = "0.1"
rand = "0.6.5"
tokio-sync = "=0.2.0-alpha.4"
tokio-timer = "=0.3.0-alpha.4"
tower-discover = { version = "=0.3.0-alpha.1", path = "../tower-discover" }
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tower-load = { version = "=0.3.0-alpha.1", path = "../tower-load" }
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "=0.3.0-alpha.1", path = "../tower-make" }
slab = "0.4"
[dev-dependencies]
tracing-subscriber = "0.1.1"
hdrhistogram = "6.0"
quickcheck = { version = "0.6", default-features = false }
tokio = "=0.2.0-alpha.4"
tokio-executor = "=0.2.0-alpha.4"
tokio-test = "=0.2.0-alpha.4"
tower-buffer = { version = "=0.3.0-alpha.1b", path = "../tower-buffer" }
tower-limit = { version = "=0.3.0-alpha.1", path = "../tower-limit" }
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
tower = { version = "=0.3.0-alpha.1a", path = "../tower" }

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Balance
Balance load across a set of uniform services.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,11 +0,0 @@
//! Load balancing middlewares.
#![doc(html_root_url = "https://docs.rs/tower-balance/0.3.0-alpha.1")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
#![deny(warnings)]
pub mod error;
pub mod p2c;
pub mod pool;

View File

@ -1,48 +0,0 @@
use super::BalanceMake;
use rand::{rngs::SmallRng, FromEntropy, Rng, SeedableRng};
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Efficiently distributes requests across an arbitrary number of services
#[derive(Clone)]
pub struct BalanceLayer<D, Req> {
rng: SmallRng,
_marker: PhantomData<fn(D, Req)>,
}
impl<D, Req> BalanceLayer<D, Req> {
/// Builds a balancer using the system entropy.
pub fn new() -> Self {
Self {
rng: SmallRng::from_entropy(),
_marker: PhantomData,
}
}
/// Builds a balancer from the provided RNG.
///
/// This may be preferrable when many balancers are initialized.
pub fn from_rng<R: Rng>(rng: &mut R) -> Result<Self, rand::Error> {
let rng = SmallRng::from_rng(rng)?;
Ok(Self {
rng,
_marker: PhantomData,
})
}
}
impl<S, Req> Layer<S> for BalanceLayer<S, Req> {
type Service = BalanceMake<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service {
BalanceMake::new(make_discover, self.rng.clone())
}
}
impl<D, Req> fmt::Debug for BalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BalanceLayer")
.field("rng", &self.rng)
.finish()
}
}

View File

@ -1,83 +0,0 @@
use super::Balance;
use futures_core::ready;
use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy};
use std::marker::PhantomData;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_discover::Discover;
use tower_service::Service;
/// Makes `Balancer`s given an inner service that makes `Discover`s.
#[derive(Clone, Debug)]
pub struct BalanceMake<S, Req> {
inner: S,
rng: SmallRng,
_marker: PhantomData<fn(Req)>,
}
#[pin_project]
/// Makes a balancer instance.
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
rng: SmallRng,
_marker: PhantomData<fn(Req)>,
}
impl<S, Req> BalanceMake<S, Req> {
pub(crate) fn new(inner: S, rng: SmallRng) -> Self {
Self {
inner,
rng,
_marker: PhantomData,
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(make_discover: S) -> Self {
Self::new(make_discover, SmallRng::from_entropy())
}
}
impl<S, Target, Req> Service<Target> for BalanceMake<S, Req>
where
S: Service<Target>,
S::Response: Discover,
<S::Response as Discover>::Service: Service<Req>,
{
type Response = Balance<S::Response, Req>;
type Error = S::Error;
type Future = MakeFuture<S::Future, Req>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, target: Target) -> Self::Future {
MakeFuture {
inner: self.inner.call(target),
rng: self.rng.clone(),
_marker: PhantomData,
}
}
}
impl<F, T, E, Req> Future for MakeFuture<F, Req>
where
F: Future<Output = Result<T, E>>,
T: Discover,
<T as Discover>::Service: Service<Req>,
{
type Output = Result<Balance<T, Req>, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner, this.rng.clone());
Poll::Ready(Ok(svc))
}
}

View File

@ -1,12 +0,0 @@
//! A Power-of-Two-Choices Load Balancer
mod layer;
mod make;
mod service;
#[cfg(test)]
mod test;
pub use layer::BalanceLayer;
pub use make::{BalanceMake, MakeFuture};
pub use service::Balance;

View File

@ -1,365 +0,0 @@
use crate::error;
use futures_core::{ready, Stream};
use futures_util::{stream, try_future, try_future::TryFutureExt};
use indexmap::IndexMap;
use pin_project::pin_project;
use rand::{rngs::SmallRng, FromEntropy};
use std::marker::PhantomData;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_sync::oneshot;
use tower_discover::{Change, Discover};
use tower_load::Load;
use tower_service::Service;
use tracing::{debug, trace};
/// Distributes requests across inner services using the [Power of Two Choices][p2c].
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.
/// >
/// > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > `n` is the number of servers in the cluster.
///
/// Note that `Balance` requires that the `Discover` you use is `Unpin` in order to implement
/// `Service`. This is because it needs to be accessed from `Service::poll_ready`, which takes
/// `&mut self`. You can achieve this easily by wrapping your `Discover` in [`Box::pin`] before you
/// construct the `Balance` instance. For more details, see [#319].
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
/// [`Box::pin`]: https://doc.rust-lang.org/std/boxed/struct.Box.html#method.pin
/// [#319]: https://github.com/tower-rs/tower/issues/319
#[derive(Debug)]
pub struct Balance<D: Discover, Req> {
discover: D,
ready_services: IndexMap<D::Key, D::Service>,
unready_services: stream::FuturesUnordered<UnreadyService<D::Key, D::Service, Req>>,
cancelations: IndexMap<D::Key, oneshot::Sender<()>>,
/// Holds an index into `endpoints`, indicating the service that has been
/// chosen to dispatch the next request.
next_ready_index: Option<usize>,
rng: SmallRng,
_req: PhantomData<Req>,
}
#[pin_project]
/// A Future that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is removed from discovery.
#[derive(Debug)]
struct UnreadyService<K, S, Req> {
key: Option<K>,
#[pin]
cancel: oneshot::Receiver<()>,
service: Option<S>,
_req: PhantomData<Req>,
}
enum Error<E> {
Inner(E),
Canceled,
}
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Service: Service<Req>,
{
/// Initializes a P2C load balancer from the provided randomization source.
pub fn new(discover: D, rng: SmallRng) -> Self {
Self {
rng,
discover,
ready_services: IndexMap::default(),
cancelations: IndexMap::default(),
unready_services: stream::FuturesUnordered::new(),
next_ready_index: None,
_req: PhantomData,
}
}
/// Initializes a P2C load balancer from the OS's entropy source.
pub fn from_entropy(discover: D) -> Self {
Self::new(discover, SmallRng::from_entropy())
}
/// Returns the number of endpoints currently tracked by the balancer.
pub fn len(&self) -> usize {
self.ready_services.len() + self.unready_services.len()
}
// XXX `pool::Pool` requires direct access to this... Not ideal.
pub(crate) fn discover_mut(&mut self) -> &mut D {
&mut self.discover
}
}
impl<D, Req> Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<error::Error>,
{
/// Polls `discover` for updates, adding new items to `not_ready`.
///
/// Removals may alter the order of either `ready` or `not_ready`.
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Discover>> {
debug!("updating from discover");
loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.map_err(|e| error::Discover(e.into()))?
{
Change::Remove(key) => {
trace!("remove");
self.evict(&key)
}
Change::Insert(key, svc) => {
trace!("insert");
self.evict(&key);
self.push_unready(key, svc);
}
}
}
}
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
let (tx, rx) = oneshot::channel();
self.cancelations.insert(key.clone(), tx);
self.unready_services.push(UnreadyService {
key: Some(key),
service: Some(svc),
cancel: rx,
_req: PhantomData,
});
}
fn evict(&mut self, key: &D::Key) {
// Update the ready index to account for reordering of ready.
if let Some((idx, _, _)) = self.ready_services.swap_remove_full(key) {
self.next_ready_index = self
.next_ready_index
.and_then(|i| Self::repair_index(i, idx, self.ready_services.len()));
debug_assert!(!self.cancelations.contains_key(key));
} else if let Some(cancel) = self.cancelations.swap_remove(key) {
let _ = cancel.send(());
}
}
fn poll_unready(&mut self, cx: &mut Context<'_>) {
loop {
match Pin::new(&mut self.unready_services).poll_next(cx) {
Poll::Pending | Poll::Ready(None) => return,
Poll::Ready(Some(Ok((key, svc)))) => {
trace!("endpoint ready");
let _cancel = self.cancelations.swap_remove(&key);
debug_assert!(_cancel.is_some(), "missing cancelation");
self.ready_services.insert(key, svc);
}
Poll::Ready(Some(Err((key, Error::Canceled)))) => {
debug_assert!(!self.cancelations.contains_key(&key))
}
Poll::Ready(Some(Err((key, Error::Inner(e))))) => {
let error = e.into();
debug!({ %error }, "dropping failed endpoint");
let _cancel = self.cancelations.swap_remove(&key);
debug_assert!(_cancel.is_some());
}
}
}
}
// Returns the updated index of `orig_idx` after the entry at `rm_idx` was
// swap-removed from an IndexMap with `orig_sz` items.
//
// If `orig_idx` is the same as `rm_idx`, None is returned to indicate that
// index cannot be repaired.
fn repair_index(orig_idx: usize, rm_idx: usize, new_sz: usize) -> Option<usize> {
debug_assert!(orig_idx <= new_sz && rm_idx <= new_sz);
let repaired = match orig_idx {
i if i == rm_idx => None, // removed
i if i == new_sz => Some(rm_idx), // swapped
i => Some(i), // uneffected
};
trace!(
{ next.idx = orig_idx, removed.idx = rm_idx, length = new_sz, repaired.idx = ?repaired },
"repairing index"
);
repaired
}
/// Performs P2C on inner services to find a suitable endpoint.
fn p2c_next_ready_index(&mut self) -> Option<usize> {
match self.ready_services.len() {
0 => None,
1 => Some(0),
len => {
// Get two distinct random indexes (in a random order) and
// compare the loads of the service at each index.
let idxs = rand::seq::index::sample(&mut self.rng, len, 2);
let aidx = idxs.index(0);
let bidx = idxs.index(1);
debug_assert_ne!(aidx, bidx, "random indices must be distinct");
let aload = self.ready_index_load(aidx);
let bload = self.ready_index_load(bidx);
let ready = if aload <= bload { aidx } else { bidx };
trace!({ a.idx = aidx, a.load = ?aload, b.idx = bidx, b.load = ?bload, ready = ?ready }, "choosing by load");
Some(ready)
}
}
}
/// Accesses a ready endpoint by index and returns its current load.
fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
svc.load()
}
fn poll_ready_index_or_evict(
&mut self,
cx: &mut Context<'_>,
index: usize,
) -> Poll<Result<(), ()>> {
let (_, svc) = self
.ready_services
.get_index_mut(index)
.expect("invalid index");
match svc.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Pending => {
// became unready; so move it back there.
let (key, svc) = self
.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
self.push_unready(key, svc);
Poll::Pending
}
Poll::Ready(Err(e)) => {
// failed, so drop it.
let error = e.into();
debug!({ %error }, "evicting failed endpoint");
self.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
Poll::Ready(Err(()))
}
}
}
}
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Clone,
D::Error: Into<error::Error>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<error::Error>,
{
type Response = <D::Service as Service<Req>>::Response;
type Error = error::Error;
type Future = try_future::MapErr<
<D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> error::Error,
>;
/// Prepares the balancer to process a request.
///
/// When `Async::Ready` is returned, `ready_index` is set with a valid index
/// into `ready` referring to a `Service` that is ready to disptach a request.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// First and foremost, process discovery updates. This removes or updates a
// previously-selected `ready_index` if appropriate.
let _ = self.poll_discover(cx)?;
// Drive new or busy services to readiness.
self.poll_unready(cx);
trace!({ nready = self.ready_services.len(), nunready = self.unready_services.len() }, "poll_ready");
loop {
// If a node has already been selected, ensure that it is ready.
// This ensures that the underlying service is ready immediately
// before a request is dispatched to it. If, e.g., a failure
// detector has changed the state of the service, it may be evicted
// from the ready set so that P2C can be performed again.
if let Some(index) = self.next_ready_index {
trace!({ next.idx = index }, "preselected ready_index");
debug_assert!(index < self.ready_services.len());
if let Poll::Ready(Ok(())) = self.poll_ready_index_or_evict(cx, index) {
return Poll::Ready(Ok(()));
}
self.next_ready_index = None;
}
self.next_ready_index = self.p2c_next_ready_index();
if self.next_ready_index.is_none() {
debug_assert!(self.ready_services.is_empty());
return Poll::Pending;
}
}
}
fn call(&mut self, request: Req) -> Self::Future {
let index = self.next_ready_index.take().expect("not ready");
let (key, mut svc) = self
.ready_services
.swap_remove_index(index)
.expect("invalid ready index");
// no need to repair since the ready_index has been cleared.
let fut = svc.call(request);
self.push_unready(key, svc);
fut.map_err(Into::into)
}
}
impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
type Output = Result<(K, S), (K, Error<S::Error>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(Ok(())) = this.cancel.poll(cx) {
let key = this.key.take().expect("polled after ready");
return Poll::Ready(Err((key, Error::Canceled)));
}
let res = ready!(this
.service
.as_mut()
.expect("poll after ready")
.poll_ready(cx));
let key = this.key.take().expect("polled after ready");
let svc = this.service.take().expect("polled after ready");
match res {
Ok(()) => Poll::Ready(Ok((key, svc))),
Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
}
}
}

View File

@ -1,122 +0,0 @@
use futures_util::pin_mut;
use std::{future::Future, task::Poll};
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_discover::ServiceList;
use tower_load as load;
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
use super::*;
#[test]
fn empty() {
task::mock(|cx| {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = Balance::from_entropy(disco);
assert_pending!(svc.poll_ready(cx));
});
}
#[test]
fn single_endpoint() {
task::mock(|cx| {
let (mock, handle) = mock::pair();
pin_mut!(handle);
let mock = load::Constant::new(mock, 0);
let disco = ServiceList::new(vec![mock].into_iter());
let mut svc = Balance::from_entropy(disco);
handle.allow(0);
assert_pending!(svc.poll_ready(cx));
assert_eq!(svc.len(), 1, "balancer must have discovered endpoint");
handle.allow(1);
assert_ready_ok!(svc.poll_ready(cx));
let fut = svc.call(());
pin_mut!(fut);
assert_request_eq!(handle, ()).send_response(1);
assert_eq!(assert_ready_ok!(fut.poll(cx)), 1);
handle.allow(1);
assert_ready_ok!(svc.poll_ready(cx));
handle.send_error("endpoint lost");
assert_pending!(svc.poll_ready(cx));
assert!(svc.len() == 0, "balancer must drop failed endpoints");
});
}
#[test]
fn two_endpoints_with_equal_load() {
task::mock(|cx| {
let (mock_a, handle_a) = mock::pair();
let (mock_b, handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1);
let mock_b = load::Constant::new(mock_b, 1);
pin_mut!(handle_a);
pin_mut!(handle_b);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = Balance::from_entropy(disco);
handle_a.allow(0);
handle_b.allow(0);
assert_pending!(svc.poll_ready(cx));
assert_eq!(svc.len(), 2, "balancer must have discovered both endpoints");
handle_a.allow(1);
handle_b.allow(0);
assert_ready_ok!(
svc.poll_ready(cx),
"must be ready when one of two services is ready"
);
{
let fut = svc.call(());
pin_mut!(fut);
assert_request_eq!(handle_a, ()).send_response("a");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "a");
}
handle_a.allow(0);
handle_b.allow(1);
assert_ready_ok!(
svc.poll_ready(cx),
"must be ready when both endpoints are ready"
);
{
let fut = svc.call(());
pin_mut!(fut);
assert_request_eq!(handle_b, ()).send_response("b");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "b");
}
handle_a.allow(1);
handle_b.allow(1);
for _ in 0..2 {
assert_ready_ok!(
svc.poll_ready(cx),
"must be ready when both endpoints are ready"
);
let fut = svc.call(());
pin_mut!(fut);
for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request(cx) {
tracing::info!("using {}", c);
tx.send_response(c);
h.allow(0);
}
}
assert_ready_ok!(fut.as_mut().poll(cx));
}
handle_a.send_error("endpoint lost");
assert_pending!(svc.poll_ready(cx));
assert_eq!(svc.len(), 1, "balancer must drop failed endpoints",);
});
}

View File

@ -1,200 +0,0 @@
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_load as load;
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
use super::*;
#[test]
fn basic() {
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let mut pool = Builder::new().build(mock, ());
assert_pending!(pool.poll_ready(cx));
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// send a request to the one backing service
let fut = pool.call(());
pin_mut!(fut);
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(svc1, ()).send_response("foobar");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foobar");
});
}
#[test]
fn high_load() {
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let mut pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.max_services(Some(2))
.build(mock, ());
assert_pending!(pool.poll_ready(cx));
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// make the one backing service not ready
let fut1 = pool.call(());
pin_mut!(fut1);
// if we poll_ready again, pool should notice that load is increasing
// since urgency == 1.0, it should immediately enter high load
assert_pending!(pool.poll_ready(cx));
// it should ask the maker for another service, so we give it one
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again for one more request
assert_ready_ok!(pool.poll_ready(cx));
let fut2 = pool.call(());
pin_mut!(fut2);
assert_pending!(pool.poll_ready(cx));
// the pool should _not_ try to add another service
// sicen we have max_services(2)
assert_pending!(handle.as_mut().poll_request(cx));
// let see that each service got one request
assert_request_eq!(svc1, ()).send_response("foo");
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut1.poll(cx)), "foo");
assert_eq!(assert_ready_ok!(fut2.poll(cx)), "bar");
});
}
#[test]
fn low_load() {
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let mut pool = Builder::new()
.urgency(1.0) // so any event will change the service count
.build(mock, ());
assert_pending!(pool.poll_ready(cx));
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// cycling a request should now work
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
// and pool should now not be ready (since svc1 isn't ready)
// it should immediately try to add another service
// which we give it
assert_pending!(pool.poll_ready(cx));
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// pool is now ready
// which (because of urgency == 1.0) should immediately cause it to drop a service
// it'll drop svc1, so it'll still be ready
assert_ready_ok!(pool.poll_ready(cx));
// and even with another ready, it won't drop svc2 since its now the only service
assert_ready_ok!(pool.poll_ready(cx));
// cycling a request should now work on svc2
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
// and again (still svc2)
svc2.allow(1);
assert_ready_ok!(pool.poll_ready(cx));
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
});
}
#[test]
fn failing_service() {
task::mock(|cx| {
// start the pool
let (mock, handle) =
mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let mut pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.build(mock, ());
assert_pending!(pool.poll_ready(cx));
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready(cx));
// one request-response cycle
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "foo");
// now make svc1 fail, so it has to be removed
svc1.send_error("ouch");
// polling now should recognize the failed service,
// try to create a new one, and then realize the maker isn't ready
assert_pending!(pool.poll_ready(cx));
// then we release another service
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again
assert_ready_ok!(pool.poll_ready(cx));
// and a cycle should work (and go through svc2)
let fut = pool.call(());
pin_mut!(fut);
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut.poll(cx)), "bar");
});
}

View File

@ -1,19 +0,0 @@
# 0.3.0-alpha.1b (September 13, 2019)
- Remove `Stream` unused warning.
# 0.3.0-alpha.1a (September 13, 2019)
- Fix `poll_next` not exisitng.
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.1 (July 19, 2019)
- Add `tracing` support
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,40 +0,0 @@
[package]
name = "tower-buffer"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1b"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-buffer/0.3.0-alpha.1"
description = """
Buffer requests before dispatching to a `Service`.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
log = ["tracing/log"]
default = ["log"]
[dependencies]
futures-core-preview = "=0.3.0-alpha.18"
pin-project = { version = "=0.4.0-alpha.11", features = ["project_attr"] }
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tokio-executor = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"
tracing = "0.1.2"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
tokio-test = { version = "=0.2.0-alpha.4" }
futures-util-preview = "=0.3.0-alpha.18"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Buffer
Buffer requests before dispatching to a `Service`.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,57 +0,0 @@
use crate::{error::Error, service::Buffer, worker::WorkerExecutor};
use std::{fmt, marker::PhantomData};
use tokio_executor::DefaultExecutor;
use tower_layer::Layer;
use tower_service::Service;
/// Buffer requests with a bounded buffer
pub struct BufferLayer<Request, E = DefaultExecutor> {
bound: usize,
executor: E,
_p: PhantomData<fn(Request)>,
}
impl<Request> BufferLayer<Request, DefaultExecutor> {
pub fn new(bound: usize) -> Self {
BufferLayer {
bound,
executor: DefaultExecutor::current(),
_p: PhantomData,
}
}
}
impl<Request, E: Clone> BufferLayer<Request, E> {
pub fn with_executor(bound: usize, executor: E) -> Self {
BufferLayer {
bound,
executor,
_p: PhantomData,
}
}
}
impl<E, S, Request> Layer<S> for BufferLayer<Request, E>
where
S: Service<Request>,
S::Error: Into<Error>,
E: WorkerExecutor<S, Request> + Clone,
{
type Service = Buffer<S, Request>;
fn layer(&self, service: S) -> Self::Service {
Buffer::with_executor(service, self.bound, &mut self.executor.clone())
}
}
impl<Request, E> fmt::Debug for BufferLayer<Request, E>
where
// Require E: Debug in case we want to print the executor at a later date
E: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferLayer")
.field("bound", &self.bound)
.finish()
}
}

View File

@ -1,21 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-buffer/0.3.0-alpha.1a")]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
//! Buffer requests when the inner service is out of capacity.
//!
//! Buffering works by spawning a new task that is dedicated to pulling requests
//! out of the buffer and dispatching them to the inner service. By adding a
//! buffer and a dedicated task, the `Buffer` layer in front of the service can
//! be `Clone` even if the inner service is not.
pub mod error;
pub mod future;
mod layer;
mod message;
mod service;
mod worker;
pub use crate::layer::BufferLayer;
pub use crate::service::Buffer;
pub use crate::worker::WorkerExecutor;

View File

@ -1,136 +0,0 @@
use crate::{
error::{Error, SpawnError},
future::ResponseFuture,
message::Message,
worker::{Handle, Worker, WorkerExecutor},
};
use futures_core::ready;
use std::task::{Context, Poll};
use tokio_executor::DefaultExecutor;
use tokio_sync::{mpsc, oneshot};
use tower_service::Service;
/// Adds a buffer in front of an inner service.
///
/// See crate level documentation for more details.
pub struct Buffer<T, Request>
where
T: Service<Request>,
{
tx: mpsc::Sender<Message<Request, T::Future>>,
worker: Option<Handle>,
}
impl<T, Request> Buffer<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
/// Creates a new `Buffer` wrapping `service`.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// The default Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
pub fn new(service: T, bound: usize) -> Self
where
T: Send + 'static,
T::Future: Send,
T::Error: Send + Sync,
Request: Send + 'static,
{
Self::with_executor(service, bound, &mut DefaultExecutor::current())
}
/// Creates a new `Buffer` wrapping `service`.
///
/// `executor` is used to spawn a new `Worker` task that is dedicated to
/// draining the buffer and dispatching the requests to the internal
/// service.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn with_executor<E>(service: T, bound: usize, executor: &mut E) -> Self
where
E: WorkerExecutor<T, Request>,
{
let (tx, rx) = mpsc::channel(bound);
let worker = Worker::spawn(service, rx, executor);
Buffer { tx, worker }
}
fn get_worker_error(&self) -> Error {
self.worker
.as_ref()
.map(|w| w.get_error_on_closed())
.unwrap_or_else(|| {
// If there's no worker handle, that's because spawning it
// at the beginning failed.
SpawnError::new().into()
})
}
}
impl<T, Request> Service<Request> for Buffer<T, Request>
where
T: Service<Request>,
T::Error: Into<Error>,
{
type Response = T::Response;
type Error = Error;
type Future = ResponseFuture<T::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If the inner service has errored, then we error here.
if let Err(_) = ready!(self.tx.poll_ready(cx)) {
Poll::Ready(Err(self.get_worker_error()))
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, request: Request) -> Self::Future {
// TODO:
// ideally we'd poll_ready again here so we don't allocate the oneshot
// if the try_send is about to fail, but sadly we can't call poll_ready
// outside of task context.
let (tx, rx) = oneshot::channel();
// get the current Span so that we can explicitly propagate it to the worker
// if we didn't do this, events on the worker related to this span wouldn't be counted
// towards that span since the worker would have no way of entering it.
let span = tracing::Span::current();
tracing::trace!(parent: &span, "sending request to buffer worker");
match self.tx.try_send(Message { request, span, tx }) {
Err(e) => {
if e.is_closed() {
ResponseFuture::failed(self.get_worker_error())
} else {
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender`
// handles may not send a message using that slot. This
// guarantees capacity for `request`.
//
// Given this, the only way to hit this code path is if
// `poll_ready` has not been called & `Ready` returned.
panic!("buffer full; poll_ready must be called first");
}
}
Ok(_) => ResponseFuture::new(rx),
}
}
}
impl<T, Request> Clone for Buffer<T, Request>
where
T: Service<Request>,
{
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
worker: self.worker.clone(),
}
}
}

View File

@ -1,213 +0,0 @@
use futures_util::pin_mut;
use std::future::Future;
use std::{cell::RefCell, thread};
use tokio_executor::{SpawnError, TypedExecutor};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
use tower_buffer::{error, Buffer};
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn req_and_res() {
task::mock(|cx| {
let (mut service, handle) = new_service();
let response = service.call("hello");
pin_mut!(response);
pin_mut!(handle);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(assert_ready_ok!(response.as_mut().poll(cx)), "world");
});
}
#[test]
fn clears_canceled_requests() {
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
handle.allow(1);
let res1 = service.call("hello");
pin_mut!(res1);
let send_response1 = assert_request_eq!(handle, "hello");
// don't respond yet, new requests will get buffered
let res2 = service.call("hello2");
assert_pending!(handle.as_mut().poll_request(cx));
let res3 = service.call("hello3");
pin_mut!(res3);
drop(res2);
send_response1.send_response("world");
assert_eq!(assert_ready_ok!(res1.poll(cx)), "world");
// res2 was dropped, so it should have been canceled in the buffer
handle.allow(1);
assert_request_eq!(handle, "hello3").send_response("world3");
assert_eq!(assert_ready_ok!(res3.poll(cx)), "world3");
});
}
#[test]
fn when_inner_is_not_ready() {
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
// Make the service NotReady
handle.allow(0);
let res1 = service.call("hello");
pin_mut!(res1);
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
assert_pending!(res1.as_mut().poll(cx));
assert_pending!(handle.as_mut().poll_request(cx));
handle.allow(1);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(assert_ready_ok!(res1.poll(cx)), "world");
});
}
#[test]
fn when_inner_fails() {
task::mock(|cx| {
use std::error::Error as StdError;
let (mut service, mut handle) = new_service();
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
let res1 = service.call("hello");
pin_mut!(res1);
// Allow the Buffer's executor to do work
::std::thread::sleep(::std::time::Duration::from_millis(100));
let e = assert_ready_err!(res1.poll(cx));
if let Some(e) = e.downcast_ref::<error::ServiceError>() {
let e = e.source().unwrap();
assert_eq!(e.to_string(), "foobar");
} else {
panic!("unexpected error type: {:?}", e);
}
});
}
#[test]
fn when_spawn_fails() {
task::mock(|cx| {
let (service, _handle) = mock::pair::<(), ()>();
let mut exec = ExecFn(|_| Err(()));
let mut service = Buffer::with_executor(service, 1, &mut exec);
let err = assert_ready_err!(service.poll_ready(cx));
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
})
}
#[test]
fn poll_ready_when_worker_is_dropped_early() {
task::mock(|cx| {
let (service, _handle) = mock::pair::<(), ()>();
// drop that worker right on the floor!
let mut exec = ExecFn(|fut| {
drop(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec);
let err = assert_ready_err!(service.poll_ready(cx));
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
});
}
#[test]
fn response_future_when_worker_is_dropped_early() {
task::mock(|cx| {
let (service, mut handle) = mock::pair::<_, ()>();
// hold the worker in a cell until we want to drop it later
let cell = RefCell::new(None);
let mut exec = ExecFn(|fut| {
*cell.borrow_mut() = Some(fut);
Ok(())
});
let mut service = Buffer::with_executor(service, 1, &mut exec);
// keep the request in the worker
handle.allow(0);
let response = service.call("hello");
pin_mut!(response);
// drop the worker (like an executor closing up)
cell.borrow_mut().take();
let err = assert_ready_err!(response.poll(cx));
assert!(err.is::<error::Closed>(), "should be a Closed: {:?}", err);
})
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
struct Exec;
impl<F> TypedExecutor<F> for Exec
where
F: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
thread::spawn(move || {
let mut mock = tokio_test::task::MockTask::new();
pin_mut!(fut);
while mock.poll(fut.as_mut()).is_pending() {}
});
Ok(())
}
}
struct ExecFn<Func>(Func);
impl<Func, F> TypedExecutor<F> for ExecFn<Func>
where
Func: Fn(F) -> Result<(), ()>,
F: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, fut: F) -> Result<(), SpawnError> {
(self.0)(fut).map_err(|()| SpawnError::shutdown())
}
}
fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
let (service, handle) = mock::pair();
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests
let service = Buffer::with_executor(service, 10, &mut Exec);
(service, handle)
}

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,27 +0,0 @@
[package]
name = "tower-discover"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-discover/0.3.0-alpha.1"
description = """
Abstracts over service discovery strategies.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core-preview = "=0.3.0-alpha.18"
tower-service = "=0.3.0-alpha.1"
pin-project = "=0.4.0-alpha.11"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Discovery
Abstracts over service discovery strategies.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,94 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-discover/0.3.0-alpha.1")]
#![deny(rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
//! # Tower service discovery
//!
//! Service discovery is the automatic detection of services available to the
//! consumer. These services typically live on other servers and are accessible
//! via the network; however, it is possible to discover services available in
//! other processes or even in process.
mod error;
mod list;
mod stream;
pub use crate::{list::ServiceList, stream::ServiceStream};
use std::hash::Hash;
use std::ops;
use std::{
pin::Pin,
task::{Context, Poll},
};
/// Provide a uniform set of services able to satisfy a request.
///
/// This set of services may be updated over time. On each change to the set, a
/// new `NewServiceSet` is yielded by `Discover`.
///
/// See crate documentation for more details.
pub trait Discover {
/// NewService key
type Key: Hash + Eq;
type Service;
/// Error produced during discovery
type Error;
/// Yields the next discovery change set.
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>>;
}
// delegate through Pin
impl<P> Discover for Pin<P>
where
P: Unpin + ops::DerefMut,
P::Target: Discover,
{
type Key = <<P as ops::Deref>::Target as Discover>::Key;
type Service = <<P as ops::Deref>::Target as Discover>::Service;
type Error = <<P as ops::Deref>::Target as Discover>::Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Pin::get_mut(self).as_mut().poll_discover(cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for &mut D {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
Discover::poll_discover(Pin::new(&mut **self), cx)
}
}
impl<D: ?Sized + Discover + Unpin> Discover for Box<D> {
type Key = D::Key;
type Service = D::Service;
type Error = D::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
D::poll_discover(Pin::new(&mut *self), cx)
}
}
/// A change in the service set
pub enum Change<K, V> {
Insert(K, V),
Remove(K),
}

View File

@ -1,50 +0,0 @@
use crate::{Change, Discover};
use futures_core::{ready, TryStream};
use pin_project::pin_project;
use std::hash::Hash;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// Dynamic service discovery based on a stream of service changes.
#[pin_project]
pub struct ServiceStream<S> {
#[pin]
inner: S,
}
impl<S> ServiceStream<S> {
pub fn new<K, Svc, Request>(services: S) -> Self
where
S: TryStream<Ok = Change<K, Svc>>,
K: Hash + Eq,
Svc: Service<Request>,
{
ServiceStream { inner: services }
}
}
impl<S, K, Svc> Discover for ServiceStream<S>
where
K: Hash + Eq,
S: TryStream<Ok = Change<K, Svc>>,
{
type Key = K;
type Service = Svc;
type Error = S::Error;
fn poll_discover(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
match ready!(self.project().inner.try_poll_next(cx)).transpose()? {
Some(c) => Poll::Ready(Ok(c)),
None => {
// there are no more service changes coming
Poll::Pending
}
}
}
}

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,36 +0,0 @@
[package]
name = "tower-filter"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-filter/0.3.0-alpha.1"
description = """
Conditionally allow requests to be dispatched to a service based on the result
of a predicate.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
publish = false
[dependencies]
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
pin-project = { version = "=0.4.0-alpha.11", features = ["project_attr"] }
futures-core-preview = "=0.3.0-alpha.18"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "=0.2.0-alpha.4"
tokio = "=0.2.0-alpha.4"
futures-util-preview = "=0.3.0-alpha.18"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,14 +0,0 @@
# Tower Filter
Conditionally allow requests to be dispatched to a service based on the result
of a predicate.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,20 +0,0 @@
[package]
name = "tower-hedge"
version = "0.3.0-alpha.1"
authors = ["Alex Leong <adlleong@gmail.com>"]
edition = "2018"
publish = false
[dependencies]
hdrhistogram = "6.0"
log = "0.4.1"
tower-service = "0.3.0-alpha.1"
tower-filter = { version = "0.3.0-alpha.1", path = "../tower-filter" }
tokio-timer = "0.3.0-alpha.4"
futures-util-preview = "0.3.0-alpha.18"
pin-project = "0.4.0-alpha.10"
[dev-dependencies]
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "0.2.0-alpha.4"
tokio-executor = "0.2.0-alpha.4"

View File

View File

@ -1,193 +0,0 @@
use futures_util::pin_mut;
use std::future::Future;
use std::time::Duration;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task};
use tower_hedge::{Hedge, Policy};
use tower_service::Service;
use tower_test::assert_request_eq;
#[test]
fn hedge_orig_completes_first() {
task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("orig");
pin_mut!(fut);
// Check that orig request has been issued.
let req = assert_request_eq!(handle, "orig");
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
time.advance(Duration::from_millis(10));
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check that the hedge has been issued.
let _hedge_req = assert_request_eq!(handle, "orig");
req.send_response("orig-done");
// Check that fut gets orig response.
assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
});
}
#[test]
fn hedge_hedge_completes_first() {
task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("orig");
pin_mut!(fut);
// Check that orig request has been issued.
let _req = assert_request_eq!(handle, "orig");
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
time.advance(Duration::from_millis(10));
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check that the hedge has been issued.
let hedge_req = assert_request_eq!(handle, "orig");
hedge_req.send_response("hedge-done");
// Check that fut gets hedge response.
assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "hedge-done");
});
});
}
#[test]
fn completes_before_hedge() {
task::mock(|cx| {
clock::mock(|_| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("orig");
pin_mut!(fut);
// Check that orig request has been issued.
let req = assert_request_eq!(handle, "orig");
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
req.send_response("orig-done");
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
// Check that fut gets orig response.
assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
});
}
#[test]
fn request_not_retyable() {
task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call(NOT_RETRYABLE);
pin_mut!(fut);
// Check that orig request has been issued.
let req = assert_request_eq!(handle, NOT_RETRYABLE);
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
time.advance(Duration::from_millis(10));
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
req.send_response("orig-done");
// Check that fut gets orig response.
assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
});
}
#[test]
fn request_not_clonable() {
task::mock(|cx| {
clock::mock(|time| {
let (mut service, handle) = new_service(TestPolicy);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call(NOT_CLONABLE);
pin_mut!(fut);
// Check that orig request has been issued.
let req = assert_request_eq!(handle, NOT_CLONABLE);
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
time.advance(Duration::from_millis(10));
// Check fut is not ready.
assert_pending!(fut.as_mut().poll(cx));
// Check hedge has not been issued.
assert_pending!(handle.as_mut().poll_request(cx));
req.send_response("orig-done");
// Check that fut gets orig response.
assert_eq!(assert_ready_ok!(fut.as_mut().poll(cx)), "orig-done");
});
});
}
type Req = &'static str;
type Res = &'static str;
type Mock = tower_test::mock::Mock<Req, Res>;
type Handle = tower_test::mock::Handle<Req, Res>;
static NOT_RETRYABLE: &'static str = "NOT_RETRYABLE";
static NOT_CLONABLE: &'static str = "NOT_CLONABLE";
#[derive(Clone)]
struct TestPolicy;
impl tower_hedge::Policy<Req> for TestPolicy {
fn can_retry(&self, req: &Req) -> bool {
*req != NOT_RETRYABLE
}
fn clone_request(&self, req: &Req) -> Option<Req> {
if *req == NOT_CLONABLE {
None
} else {
Some(req)
}
}
}
fn new_service<P: Policy<Req> + Clone>(policy: P) -> (Hedge<Mock, P>, Handle) {
let (service, handle) = tower_test::mock::pair();
let mock_latencies: [u64; 10] = [1, 1, 1, 1, 1, 1, 1, 1, 10, 10];
let service = Hedge::new_with_mock_latencies(
service,
policy,
10,
0.9,
Duration::from_secs(60),
&mock_latencies,
);
(service, handle)
}

View File

@ -1,3 +1,12 @@
# 0.3.0 (November 29, 2019)
- Move layer builder from `tower-util` to tower-layer.
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`

View File

@ -8,13 +8,13 @@ name = "tower-layer"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-layer/0.3.0-alpha.1"
documentation = "https://docs.rs/tower-layer/0.3.0-alpha.2"
description = """
Decorates a `Service` to allow easy composition between `Service`s.
"""
@ -22,8 +22,6 @@ categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core-preview = "=0.3.0-alpha.18"
tower-service = "=0.3.0-alpha.1"
[dev-dependencies]
void = "1.0.2"
tower-service = { version = "0.3.0" }

View File

@ -1,5 +1,5 @@
use super::Layer;
use std::fmt;
use tower_layer::Layer;
/// A no-op middleware.
///
@ -27,7 +27,7 @@ impl<S> Layer<S> for Identity {
}
impl fmt::Debug for Identity {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Identity").finish()
}
}

View File

@ -1,5 +1,10 @@
#![doc(html_root_url = "https://docs.rs/tower-layer/0.3.0-alpha.1")]
#![deny(missing_docs, rust_2018_idioms)]
#![doc(html_root_url = "https://docs.rs/tower-layer/0.3.0-alpha.2")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
//! Layer traits and extensions.
//!
@ -8,6 +13,11 @@
//!
//! A middleware implements the [`Layer`] and [`Service`] trait.
mod identity;
mod stack;
pub use self::{identity::Identity, stack::Stack};
/// Decorates a `Service`, transforming either the request or the response.
///
/// Often, many of the pieces needed for writing network applications can be
@ -25,7 +35,6 @@
/// # use std::task::{Poll, Context};
/// # use tower_layer::Layer;
/// # use std::fmt;
/// # use void::Void;
///
/// pub struct LogLayer {
/// target: &'static str,
@ -79,3 +88,14 @@ pub trait Layer<S> {
/// that has been decorated with the middleware.
fn layer(&self, inner: S) -> Self::Service;
}
impl<'a, T, S> Layer<S> for &'a T
where
T: ?Sized + Layer<S>,
{
type Service = T::Service;
fn layer(&self, inner: S) -> Self::Service {
(**self).layer(inner)
}
}

View File

@ -1,5 +1,5 @@
use super::Layer;
use std::fmt;
use tower_layer::Layer;
/// Two middlewares chained together.
#[derive(Clone)]
@ -34,7 +34,7 @@ where
Inner: fmt::Debug,
Outer: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The generated output of nested `Stack`s is very noisy and makes
// it harder to understand what is in a `ServiceBuilder`.
//

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,35 +0,0 @@
[package]
name = "tower-limit"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-limit/0.3.0-alpha.1"
description = """
Limit maximum request rate to a `Service`.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core-preview = "=0.3.0-alpha.18"
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tokio-sync = "=0.2.0-alpha.4"
tokio-timer = "=0.3.0-alpha.4"
pin-project = "=0.4.0-alpha.11"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "=0.2.0-alpha.4"
futures-util-preview = "=0.3.0-alpha.18"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Rate Limit
Limit maximum request rate to a `Service`.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,46 +0,0 @@
//! Future types
//!
use super::Error;
use futures_core::ready;
use pin_project::{pin_project, pinned_drop};
use std::sync::Arc;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio_sync::semaphore::Semaphore;
/// Future for the `ConcurrencyLimit` service.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
semaphore: Arc<Semaphore>,
}
impl<T> ResponseFuture<T> {
pub(crate) fn new(inner: T, semaphore: Arc<Semaphore>) -> ResponseFuture<T> {
ResponseFuture { inner, semaphore }
}
}
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
{
type Output = Result<T, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.project().inner.poll(cx)).map_err(Into::into))
}
}
#[pinned_drop]
impl<T> PinnedDrop for ResponseFuture<T> {
fn drop(mut self: Pin<&mut Self>) {
self.project().semaphore.add_permits(1);
}
}

View File

@ -1,108 +0,0 @@
use super::{future::ResponseFuture, Error};
use tower_service::Service;
use futures_core::ready;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_sync::semaphore::{self, Semaphore};
/// Enforces a limit on the concurrent number of requests the underlying
/// service can handle.
#[derive(Debug)]
pub struct ConcurrencyLimit<T> {
inner: T,
limit: Limit,
}
#[derive(Debug)]
struct Limit {
semaphore: Arc<Semaphore>,
permit: semaphore::Permit,
}
impl<T> ConcurrencyLimit<T> {
/// Create a new concurrency limiter.
pub fn new(inner: T, max: usize) -> Self {
ConcurrencyLimit {
inner,
limit: Limit {
semaphore: Arc::new(Semaphore::new(max)),
permit: semaphore::Permit::new(),
},
}
}
/// Get a reference to the inner service
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Get a mutable reference to the inner service
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consume `self`, returning the inner service
pub fn into_inner(self) -> T {
self.inner
}
}
impl<S, Request> Service<Request> for ConcurrencyLimit<S>
where
S: Service<Request>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.limit.permit.poll_acquire(cx, &self.limit.semaphore))?;
Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into))
}
fn call(&mut self, request: Request) -> Self::Future {
// Make sure a permit has been acquired
if self
.limit
.permit
.try_acquire(&self.limit.semaphore)
.is_err()
{
panic!("max requests in-flight; poll_ready must be called first");
}
// Call the inner service
let future = self.inner.call(request);
// Forget the permit, the permit will be returned when
// `future::ResponseFuture` is dropped.
self.limit.permit.forget();
ResponseFuture::new(future, self.limit.semaphore.clone())
}
}
impl<S> Clone for ConcurrencyLimit<S>
where
S: Clone,
{
fn clone(&self) -> ConcurrencyLimit<S> {
ConcurrencyLimit {
inner: self.inner.clone(),
limit: Limit {
semaphore: self.limit.semaphore.clone(),
permit: semaphore::Permit::new(),
},
}
}
}
impl Drop for Limit {
fn drop(&mut self) {
self.permit.release(&self.semaphore);
}
}

View File

@ -1,14 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-limit/0.3.0-alpha.1")]
#![cfg_attr(test, deny(warnings))]
#![deny(missing_debug_implementations, missing_docs, rust_2018_idioms)]
#![allow(elided_lifetimes_in_paths)]
//! Tower middleware for limiting requests.
pub mod concurrency;
pub mod rate;
pub use crate::{
concurrency::{ConcurrencyLimit, ConcurrencyLimitLayer},
rate::{RateLimit, RateLimitLayer},
};

View File

@ -1,3 +0,0 @@
use std::error;
pub(crate) type Error = Box<dyn error::Error + Send + Sync>;

View File

@ -1,36 +0,0 @@
//! Future types
use super::error::Error;
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Future for the `RateLimit` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
}
impl<T> ResponseFuture<T> {
pub(crate) fn new(inner: T) -> ResponseFuture<T> {
ResponseFuture { inner }
}
}
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
Error: From<E>,
{
type Output = Result<T, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(self.project().inner.poll(cx))?))
}
}

View File

@ -1,261 +0,0 @@
use futures_util::{future::poll_fn, pin_mut};
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, block_on, task::MockTask};
use tower_limit::concurrency::ConcurrencyLimit;
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn basic_service_limit_functionality_with_poll_ready() {
let mut task = MockTask::new();
let (mut service, handle) = new_service(2);
pin_mut!(handle);
block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap();
let r1 = service.call("hello 1");
block_on(poll_fn(|cx| service.poll_ready(cx))).unwrap();
let r2 = service.call("hello 2");
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert!(!task.is_woken());
// The request gets passed through
assert_request_eq!(handle, "hello 1").send_response("world 1");
// The next request gets passed through
assert_request_eq!(handle, "hello 2").send_response("world 2");
// There are no more requests
task.enter(|cx| {
assert_pending!(handle.as_mut().poll_request(cx));
});
assert_eq!(block_on(r1).unwrap(), "world 1");
assert!(task.is_woken());
// Another request can be sent
task.enter(|cx| {
assert_ready_ok!(service.poll_ready(cx));
});
let r3 = service.call("hello 3");
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
assert_eq!(block_on(r2).unwrap(), "world 2");
// The request gets passed through
assert_request_eq!(handle, "hello 3").send_response("world 3");
assert_eq!(block_on(r3).unwrap(), "world 3");
}
#[test]
fn basic_service_limit_functionality_without_poll_ready() {
let mut task = MockTask::new();
let (mut service, handle) = new_service(2);
pin_mut!(handle);
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let r1 = service.call("hello 1");
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let r2 = service.call("hello 2");
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
// The request gets passed through
assert_request_eq!(handle, "hello 1").send_response("world 1");
assert!(!task.is_woken());
// The next request gets passed through
assert_request_eq!(handle, "hello 2").send_response("world 2");
assert!(!task.is_woken());
// There are no more requests
assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx)));
assert_eq!(block_on(r1).unwrap(), "world 1");
assert!(task.is_woken());
// One more request can be sent
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let r4 = service.call("hello 4");
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_eq!(block_on(r2).unwrap(), "world 2");
assert!(task.is_woken());
// The request gets passed through
assert_request_eq!(handle, "hello 4").send_response("world 4");
assert_eq!(block_on(r4).unwrap(), "world 4");
}
#[test]
fn request_without_capacity() {
let mut task = MockTask::new();
let (mut service, _) = new_service(0);
task.enter(|cx| {
assert_pending!(service.poll_ready(cx));
});
}
#[test]
fn reserve_capacity_without_sending_request() {
let mut task = MockTask::new();
let (mut s1, handle) = new_service(1);
pin_mut!(handle);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
// Service 2 cannot get capacity
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
assert_request_eq!(handle, "hello").send_response("world");
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
block_on(r1).unwrap();
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
}
#[test]
fn service_drop_frees_capacity() {
let mut task = MockTask::new();
let (mut s1, _handle) = new_service(1);
let mut s2 = s1.clone();
// Reserve capacity in s1
assert_ready_ok!(task.enter(|cx| s1.poll_ready(cx)));
// Service 2 cannot get capacity
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
drop(s1);
assert!(task.is_woken());
assert_ready_ok!(task.enter(|cx| s2.poll_ready(cx)));
}
#[test]
fn response_error_releases_capacity() {
let mut task = MockTask::new();
let (mut s1, handle) = new_service(1);
pin_mut!(handle);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
assert_request_eq!(handle, "hello").send_error("boom");
block_on(r1).unwrap_err();
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
}
#[test]
fn response_future_drop_releases_capacity() {
let mut task = MockTask::new();
let (mut s1, _handle) = new_service(1);
let mut s2 = s1.clone();
// Reserve capacity in s1
task.enter(|cx| {
assert_ready_ok!(s1.poll_ready(cx));
});
// s1 sends the request, then s2 is able to get capacity
let r1 = s1.call("hello");
task.enter(|cx| {
assert_pending!(s2.poll_ready(cx));
});
drop(r1);
task.enter(|cx| {
assert_ready_ok!(s2.poll_ready(cx));
});
}
#[test]
fn multi_waiters() {
let mut task1 = MockTask::new();
let mut task2 = MockTask::new();
let mut task3 = MockTask::new();
let (mut s1, _handle) = new_service(1);
let mut s2 = s1.clone();
let mut s3 = s1.clone();
// Reserve capacity in s1
task1.enter(|cx| assert_ready_ok!(s1.poll_ready(cx)));
// s2 and s3 are not ready
task2.enter(|cx| assert_pending!(s2.poll_ready(cx)));
task3.enter(|cx| assert_pending!(s3.poll_ready(cx)));
drop(s1);
assert!(task2.is_woken());
assert!(!task3.is_woken());
drop(s2);
assert!(task3.is_woken());
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
fn new_service(max: usize) -> (ConcurrencyLimit<Mock>, Handle) {
let (service, handle) = mock::pair();
let service = ConcurrencyLimit::new(service, max);
(service, handle)
}

View File

@ -1,52 +0,0 @@
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, clock, task::MockTask};
use tower_limit::rate::*;
use tower_service::*;
use tower_test::{assert_request_eq, mock};
use std::future::Future;
use std::time::Duration;
#[test]
fn reaching_capacity() {
clock::mock(|time| {
let mut task = MockTask::new();
let (mut service, handle) = new_service(Rate::new(1, from_millis(100)));
pin_mut!(handle);
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
let response = service.call("hello");
pin_mut!(response);
assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(task.enter(|cx| response.poll(cx)), "world");
assert_pending!(task.enter(|cx| service.poll_ready(cx)));
assert_pending!(task.enter(|cx| handle.as_mut().poll_request(cx)));
time.advance(Duration::from_millis(100));
assert_ready_ok!(task.enter(|cx| service.poll_ready(cx)));
// Send a second request
let response = service.call("two");
pin_mut!(response);
assert_request_eq!(handle, "two").send_response("done");
assert_ready_ok!(task.enter(|cx| response.poll(cx)), "done");
});
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
fn new_service(rate: Rate) -> (RateLimit<Mock>, Handle) {
let (service, handle) = mock::pair();
let service = RateLimit::new(service, rate);
(service, handle)
}
fn from_millis(n: u64) -> Duration {
Duration::from_millis(n)
}

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,34 +0,0 @@
[package]
name = "tower-load-shed"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-load-shed/0.3.0-alpha.1"
description = """
Immediately reject requests if the inner service is not ready. This is also
known as load-shedding.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
pin-project = { version = "=0.4.0-alpha.11", features = ["project_attr"] }
futures-core-preview = "=0.3.0-alpha.18"
[dev-dependencies]
tokio-test = "=0.2.0-alpha.4"
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
futures-util-preview = "=0.3.0-alpha.18"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,14 +0,0 @@
# Tower Load Shed
Immediately reject requests if the inner service is not ready. This is also
known as load-shedding.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,49 +0,0 @@
use futures_util::pin_mut;
use std::future::Future;
use tokio_test::{assert_ready_err, assert_ready_ok, task::mock};
use tower_load_shed::{self, LoadShed};
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn when_ready() {
mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx), "overload always reports ready");
let response = service.call("hello");
pin_mut!(response);
assert_request_eq!(handle, "hello").send_response("world");
assert_eq!(assert_ready_ok!(response.poll(cx)), "world");
});
}
#[test]
fn when_not_ready() {
mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
handle.allow(0);
assert_ready_ok!(service.poll_ready(cx), "overload always reports ready");
let fut = service.call("hello");
pin_mut!(fut);
let err = assert_ready_err!(fut.poll(cx));
assert!(err.is::<tower_load_shed::error::Overloaded>());
});
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
fn new_service() -> (LoadShed<Mock>, Handle) {
let (service, handle) = mock::pair();
let service = LoadShed::new(service);
(service, handle)
}

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,34 +0,0 @@
[package]
name = "tower-load"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-load/0.3.0-alpha.1"
description = """
Strategies for measuring the load of a service
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core-preview = "=0.3.0-alpha.18"
log = "0.4.1"
tokio-timer = "=0.3.0-alpha.4"
tower-service = "=0.3.0-alpha.1"
tower-discover = { version = "=0.3.0-alpha.1", path = "../tower-discover" }
pin-project = "=0.4.0-alpha.11"
[dev-dependencies]
tokio-test = "=0.2.0-alpha.4"
futures-util-preview = "=0.3.0-alpha.18"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Load
Provides strategies for measuring a service's load.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,86 +0,0 @@
use futures_core::ready;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Attaches `I`-typed instruments to `V` typed values.
///
/// This utility allows load metrics to have a protocol-agnostic means to track streams
/// past their initial response future. For example, if `V` represents an HTTP response
/// type, an implementation could add `H`-typed handles to each response's extensions to
/// detect when the response is dropped.
///
/// Handles are intended to be RAII guards that primarily implement `Drop` and update load
/// metric state as they are dropped.
///
/// A base `impl<H, V> Instrument<H, V> for NoInstrument` is provided to drop the handle
/// immediately. This is appropriate when a response is discrete and cannot comprise
/// multiple messages.
///
/// In many cases, the `Output` type is simply `V`. However, `Instrument` may alter the
/// type in order to instrument it appropriately. For example, an HTTP Instrument may
/// modify the body type: so an `Instrument` that takes values of type `http::Response<A>`
/// may output values of type `http::Response<B>`.
pub trait Instrument<H, V>: Clone {
/// The instrumented value type.
type Output;
/// Attaches an `H`-typed handle to a `V`-typed value.
fn instrument(&self, handle: H, value: V) -> Self::Output;
}
/// A `Instrument` implementation that drops each instrument immediately.
#[derive(Clone, Copy, Debug)]
pub struct NoInstrument;
/// Attaches a `I`-typed instruments to the result of an `F`-typed `Future`.
#[pin_project]
#[derive(Debug)]
pub struct InstrumentFuture<F, I, H> {
#[pin]
future: F,
handle: Option<H>,
instrument: I,
}
// ===== impl InstrumentFuture =====
impl<F, I, H> InstrumentFuture<F, I, H> {
/// Wraps a future, instrumenting its value if successful.
pub fn new(instrument: I, handle: H, future: F) -> Self {
InstrumentFuture {
future,
instrument,
handle: Some(handle),
}
}
}
impl<F, I, H, T, E> Future for InstrumentFuture<F, I, H>
where
F: Future<Output = Result<T, E>>,
I: Instrument<H, T>,
{
type Output = Result<I::Output, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let rsp = ready!(this.future.poll(cx))?;
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.instrument.instrument(h, rsp)))
}
}
// ===== NoInstrument =====
impl<H, V> Instrument<H, V> for NoInstrument {
type Output = V;
fn instrument(&self, handle: H, value: V) -> V {
drop(handle);
value
}
}

View File

@ -1,28 +0,0 @@
//! Abstractions and utilties for measuring a service's load.
#![doc(html_root_url = "https://docs.rs/tower-load/0.3.0-alpha.1")]
#![deny(missing_docs)]
#![deny(rust_2018_idioms)]
#![deny(warnings)]
#![allow(elided_lifetimes_in_paths)]
mod constant;
mod instrument;
pub mod peak_ewma;
pub mod pending_requests;
pub use self::{
constant::Constant,
instrument::{Instrument, InstrumentFuture, NoInstrument},
peak_ewma::{PeakEwma, PeakEwmaDiscover},
pending_requests::{PendingRequests, PendingRequestsDiscover},
};
/// Exposes a load metric.
pub trait Load {
/// A comparable load metric. Lesser values are "preferable" to greater values.
type Metric: PartialOrd;
/// Obtains a service's load.
fn load(&self) -> Self::Metric;
}

View File

@ -1,12 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Bump version to match all the other crates with `std::future`
# 0.1.0-alpha.2 (August 30, 2019)
- Update `tokio-io` to `alpha.4`
# 0.1.0-alpha.1 (August 26, 2019)
- Initial release

View File

@ -1,21 +0,0 @@
[package]
name = "tower-make"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-make/0.3.0-alpha.1"
description = """
Trait aliases for Services that produce specific types of Responses.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[features]
io = ["tokio-io"]
[dependencies]
tokio-io = { version = "=0.2.0-alpha.4", optional = true }
tower-service = "=0.3.0-alpha.1"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Service Makers
Trait aliases for Services that produce specific types of Responses.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,16 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-make/0.3.0-alpha.1")]
#![deny(rust_2018_idioms)]
//! Trait aliases for Services that produce specific types of Responses.
#[cfg(feature = "io")]
mod make_connection;
mod make_service;
#[cfg(feature = "io")]
pub use crate::make_connection::MakeConnection;
pub use crate::make_service::MakeService;
mod sealed {
pub trait Sealed<T> {}
}

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)
- Initial release

View File

@ -1,28 +0,0 @@
[package]
name = "tower-reconnect"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-reconnect/0.3.0-alpha.1"
description = """
Automatically recreate a new `Service` instance when an error is encountered.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
log = "0.4.1"
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "=0.3.0-alpha.1", path = "../tower-make" }
pin-project = "=0.4.0-alpha.11"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Reconnect
Automatically recreate a new `Service` instance when an error is encountered.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,31 +0,0 @@
use crate::Error;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
#[pin_project]
pub struct ResponseFuture<F> {
#[pin]
inner: F,
}
impl<F> ResponseFuture<F> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture { inner }
}
}
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
{
type Output = Result<T, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx).map_err(Into::into)
}
}

View File

@ -1,7 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View File

@ -1,34 +0,0 @@
[package]
name = "tower-retry"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-retry/0.3.0-alpha.1"
description = """
Retry failed requests.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tokio-timer = "=0.3.0-alpha.4"
pin-project = { version = "=0.4.0-alpha.11", features = ["project_attr"] }
futures-core-preview = "=0.3.0-alpha.18"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "=0.2.0-alpha.4"
futures-util-preview = "=0.3.0-alpha.18"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Retry
Retry failed requests.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,184 +0,0 @@
use futures_util::{future, pin_mut};
use std::future::Future;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
use tower_retry::Policy;
use tower_service::Service;
use tower_test::{assert_request_eq, mock};
#[test]
fn retry_errors() {
task::mock(|cx| {
let (mut service, handle) = new_service(RetryErrors);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry me");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(fut.poll(cx), "world");
});
}
#[test]
fn retry_limit() {
task::mock(|cx| {
let (mut service, handle) = new_service(Limit(2));
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_error("retry 2");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_error("retry 3");
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 3");
});
}
#[test]
fn retry_error_inspection() {
task::mock(|cx| {
let (mut service, handle) = new_service(UnlessErr("reject"));
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_pending!(fut.as_mut().poll(cx));
assert_request_eq!(handle, "hello").send_error("reject");
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "reject");
});
}
#[test]
fn retry_cannot_clone_request() {
task::mock(|cx| {
let (mut service, handle) = new_service(CannotClone);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_error("retry 1");
assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 1");
});
}
#[test]
fn success_with_cannot_clone() {
task::mock(|cx| {
// Even though the request couldn't be cloned, if the first request succeeds,
// it should succeed overall.
let (mut service, handle) = new_service(CannotClone);
pin_mut!(handle);
assert_ready_ok!(service.poll_ready(cx));
let fut = service.call("hello");
pin_mut!(fut);
assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(fut.poll(cx), "world");
});
}
type Req = &'static str;
type Res = &'static str;
type InnerError = &'static str;
type Error = Box<dyn std::error::Error + Send + Sync>;
type Mock = mock::Mock<Req, Res>;
type Handle = mock::Handle<Req, Res>;
#[derive(Clone)]
struct RetryErrors;
impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
None
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}
#[derive(Clone)]
struct Limit(usize);
impl Policy<Req, Res, Error> for Limit {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
if result.is_err() && self.0 > 0 {
Some(future::ready(Limit(self.0 - 1)))
} else {
None
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}
#[derive(Clone)]
struct UnlessErr(InnerError);
impl Policy<Req, Res, Error> for UnlessErr {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
result.err().and_then(|err| {
if err.to_string() != self.0 {
Some(future::ready(self.clone()))
} else {
None
}
})
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}
#[derive(Clone)]
struct CannotClone;
impl Policy<Req, Res, Error> for CannotClone {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future> {
unreachable!("retry cannot be called since request isn't cloned");
}
fn clone_request(&self, _req: &Req) -> Option<Req> {
None
}
}
fn new_service<P: Policy<Req, Res, Error> + Clone>(
policy: P,
) -> (tower_retry::Retry<P, Mock>, Handle) {
let (service, handle) = mock::pair();
let service = tower_retry::Retry::new(policy, service);
(service, handle)
}

View File

@ -1,4 +1,13 @@
# 0.3.0 (Aug 20, 2019)
# 0.3.0 (November 29, 2019)
- Update to `futures 0.3`.
- Update documentation for `std::future::Future`.
# 0.3.0-alpha.2 (September 30, 2019)
- Documentation fixes.
# 0.3.0-alpha.1 (Aug 20, 2019)
* Switch to `std::future::Future`

View File

@ -1,5 +1,4 @@
[package]
name = "tower-service"
# When releasing to crates.io:
# - Remove path dependencies
@ -9,16 +8,20 @@ name = "tower-service"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.2.x" git tag.
version = "0.3.0-alpha.1"
version = "0.3.0"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-service/0.3.0-alpha.1"
documentation = "https://docs.rs/tower-service/0.3.0"
description = """
Trait representing an asynchronous, request / response based, client or server.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
[dev-dependencies]
http = "0.2"

View File

@ -9,7 +9,7 @@ simple, but powerful trait. At its heart, `Service` is just an asynchronous
function of request to response.
```
fn(Request) -> Future<Item = Response>
async fn(Request) -> Result<Response, Error>
```
Implementations of `Service` take a request, the type of which varies per

View File

@ -1,5 +1,10 @@
#![deny(missing_docs)]
#![doc(html_root_url = "https://docs.rs/tower-service/0.3.0-alpha.1")]
#![doc(html_root_url = "https://docs.rs/tower-service/0.3.0")]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
//! Definition of the core `Service` trait to Tower
//!
@ -36,23 +41,43 @@ use std::task::{Context, Poll};
///
/// As an example, here is how an HTTP request is processed by a server:
///
/// ```rust,ignore
/// impl Service<http::Request> for HelloWorld {
/// type Response = http::Response;
/// type Error = http::Error;
/// type Future = Box<Future<Output = Result<Self::Response, Self::Error>>>;
/// ```rust
/// # use std::pin::Pin;
/// # use std::task::{Poll, Context};
/// # use std::future::Future;
/// # use tower_service::Service;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::Error> {
/// Ok(Async::Ready(()))
/// use http::{Request, Response, StatusCode};
///
/// struct HelloWorld;
///
/// impl Service<Request<Vec<u8>>> for HelloWorld {
/// type Response = Response<Vec<u8>>;
/// type Error = http::Error;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: http::Request) -> Self::Future {
/// fn call(&mut self, req: Request<Vec<u8>>) -> Self::Future {
/// // create the body
/// let body: Vec<u8> = "hello, world!\n"
/// .as_bytes()
/// .to_owned();
/// // Create the HTTP response
/// let resp = http::Response::ok()
/// .with_body(b"hello world\n");
/// let resp = Response::builder()
/// .status(StatusCode::OK)
/// .body(body)
/// .expect("Unable to create `http::Response`");
///
/// // create a response in a future.
/// let fut = async {
/// Ok(resp)
/// };
///
/// // Return the response as an immediate future
/// Box::new(futures::finished(resp))
/// Box::pin(fut)
/// }
/// }
/// ```
@ -70,10 +95,10 @@ use std::task::{Context, Poll};
/// .connect("127.0.0.1:6379".parse().unwrap())
/// .unwrap();
///
/// let resp = client.call(Cmd::set("foo", "this is the value of foo"));
/// let resp = client.call(Cmd::set("foo", "this is the value of foo")).await?;
///
/// // Wait for the future to resolve
/// println!("Redis response: {:?}", await(resp));
/// println!("Redis response: {:?}", resp);
/// ```
///
/// # Middleware / Layer
@ -89,15 +114,16 @@ use std::task::{Context, Poll};
/// ```rust,ignore
/// use tower_service::Service;
/// use tower_layer::Layer;
/// use futures::FutureExt;
/// use std::future::Future;
/// use std::task::{Context, Poll};
/// use std::time::Duration;
/// use std::pin::Pin;
///
///
/// pub struct Timeout<T> {
/// inner: T,
/// delay: Duration,
/// timer: Timer,
/// timeout: Duration,
/// }
///
/// pub struct TimeoutLayer(Duration);
@ -122,21 +148,21 @@ use std::task::{Context, Poll};
/// {
/// type Response = T::Response;
/// type Error = T::Error;
/// type Future = Box<Future<Output = Result<Self::Response, Self::Error>>>;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// self.inner.poll_ready(cx).map_err(Into::into)
/// }
///
/// fn call(&mut self, req: Request) -> Self::Future {
/// let timeout = tokio_timer::sleep(self.timeout)
/// .then(|_| Err(Self::Error::from(Expired)));
/// let timeout = tokio_timer::delay_for(self.timeout)
/// .map(|_| Err(Self::Error::from(Expired)));
///
/// let f = self.inner.call(req).select(timeout)
/// .map(|(v, _)| v)
/// .map_err(|(e, _)| e);
/// let fut = Box::pin(self.inner.call(req));
/// let f = futures::select(fut, timeout)
/// .map(|either| either.factor_first().0);
///
/// Box::new(f)
/// Box::pin(f)
/// }
/// }
///
@ -146,12 +172,8 @@ use std::task::{Context, Poll};
/// }
/// }
///
/// impl<S, Request> Layer<S, Request> for TimeoutLayer
/// where
/// S: Service<Request>,
/// impl<S> Layer<S> for TimeoutLayer
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Service = Timeout<S>;
///
/// fn layer(&self, service: S) -> Timeout<S> {
@ -184,18 +206,19 @@ pub trait Service<Request> {
/// The future response value.
type Future: Future<Output = Result<Self::Response, Self::Error>>;
/// Returns `Ready` when the service is able to process requests.
/// Returns `Poll::Ready(Ok(()))` when the service is able to process requests.
///
/// If the service is at capacity, then `NotReady` is returned and the task
/// If the service is at capacity, then `Poll::Pending` is returned and the task
/// is notified when the service becomes ready again. This function is
/// expected to be called while on a task.
/// expected to be called while on a task. Generally, this can be done with
/// a simple `futures::future::poll_fn` call.
///
/// If `Err` is returned, the service is no longer able to service requests
/// If `Poll::Ready(Err(_))` is returned, the service is no longer able to service requests
/// and the caller should discard the service instance.
///
/// Once `poll_ready` returns `Ready`, a request may be dispatched to the
/// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a request may be dispatched to the
/// service using `call`. Until a request is dispatched, repeated calls to
/// `poll_ready` must return either `Ready` or `Err`.
/// `poll_ready` must return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously.
@ -204,12 +227,12 @@ pub trait Service<Request> {
/// implementations should take care to not call `poll_ready`.
///
/// Before dispatching a request, `poll_ready` must be called and return
/// `Ready`.
/// `Poll::Ready(Ok(()))`.
///
/// # Panics
///
/// Implementations are permitted to panic if `call` is invoked without
/// obtaining `Ready` from `poll_ready`.
/// obtaining `Poll::Ready(Ok(()))` from `poll_ready`.
fn call(&mut self, req: Request) -> Self::Future;
}

View File

@ -1,5 +0,0 @@
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (unreleased)

View File

@ -1,35 +0,0 @@
[package]
name = "tower-spawn-ready"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-spawn-ready/0.3.0-alpha.1"
description = """
Drives service readiness via a spawned task
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
futures-core-preview = "=0.3.0-alpha.18"
futures-util-preview = "=0.3.0-alpha.18"
pin-project = "=0.4.0-alpha.11"
tower-service = "=0.3.0-alpha.1"
tower-layer = { version = "=0.3.0-alpha.1", path = "../tower-layer" }
tokio-executor = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"
[dev-dependencies]
tower-test = { version = "=0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "=0.2.0-alpha.4"

View File

@ -1,25 +0,0 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,13 +0,0 @@
# Tower Spawn Ready
Spawn Ready ensures that its inner service is driven to readiness on an executor. Useful with pooling layers that may poll their inner service infrequently.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -1,33 +0,0 @@
//! Error types
use std::fmt;
use tokio_executor;
/// Error produced when spawning the worker fails
#[derive(Debug)]
pub struct SpawnError {
inner: tokio_executor::SpawnError,
}
/// Errors produced by `SpawnReady`.
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync>;
// ===== impl SpawnError =====
impl SpawnError {
pub(crate) fn new(inner: tokio_executor::SpawnError) -> Self {
Self { inner }
}
}
impl fmt::Display for SpawnError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt(f)
}
}
impl std::error::Error for SpawnError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.inner)
}
}

View File

@ -1,16 +0,0 @@
#![doc(html_root_url = "https://docs.rs/tower-spawn-ready/0.3.0-alpha.1")]
#![deny(missing_docs, rust_2018_idioms, warnings)]
#![allow(elided_lifetimes_in_paths)]
//! When an underlying service is not ready, drive it to readiness on a
//! background task.
pub mod error;
pub mod future;
mod layer;
mod make;
mod service;
pub use crate::layer::SpawnReadyLayer;
pub use crate::make::{MakeFuture, MakeSpawnReady};
pub use crate::service::SpawnReady;

View File

@ -1,91 +0,0 @@
use futures_util::pin_mut;
use std::{future::Future, thread, time::Duration};
use tokio_executor::{Executor, SpawnError};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
use tower_service::Service;
use tower_spawn_ready::{error, SpawnReady};
use tower_test::mock;
#[test]
fn when_inner_is_not_ready() {
let mut ex = Exec;
tokio_executor::with_default(&mut ex, || {
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
// Make the service NotReady
handle.allow(0);
assert_pending!(service.poll_ready(cx));
// Make the service is Ready
handle.allow(1);
thread::sleep(Duration::from_millis(100));
assert_ready_ok!(service.poll_ready(cx));
});
});
}
#[test]
fn when_inner_fails() {
task::mock(|cx| {
let (mut service, handle) = new_service();
pin_mut!(handle);
// Make the service NotReady
handle.allow(0);
handle.send_error("foobar");
assert_eq!(
assert_ready_err!(service.poll_ready(cx)).to_string(),
"foobar"
);
});
}
#[test]
fn when_spawn_fails() {
task::mock(|cx| {
let (service, handle) = mock::pair::<(), ()>();
pin_mut!(handle);
let mut service = SpawnReady::new(service);
// Make the service NotReady so a background task is spawned.
handle.allow(0);
let err = assert_ready_err!(service.poll_ready(cx));
assert!(
err.is::<error::SpawnError>(),
"should be a SpawnError: {:?}",
err
);
});
}
type Mock = mock::Mock<&'static str, &'static str>;
type Handle = mock::Handle<&'static str, &'static str>;
struct Exec;
impl Executor for Exec {
fn spawn(
&mut self,
fut: std::pin::Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), SpawnError> {
thread::spawn(move || {
let mut mock = tokio_test::task::MockTask::new();
pin_mut!(fut);
while mock.poll(fut.as_mut()).is_pending() {}
});
Ok(())
}
}
fn new_service() -> (SpawnReady<Mock>, Handle) {
let (service, handle) = mock::pair();
let service = SpawnReady::new(service);
(service, handle)
}

View File

@ -1,3 +1,14 @@
# 0.3.0 (December 19, 2019)
- Remove `futures-executor` dependency
- Update to non-alpha versions
- Add `mock::task_fn` util fn
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`

Some files were not shown because too many files have changed in this diff Show More