deps: move to tokio 0.3, tower 0.4

This change is mostly mechanical, with the exception of the changes to the
`tower-batch` middleware.  This middleware was adapted from `tower::buffer`,
and the `tower::buffer` code was changed to implement its own bounded queue,
because Tokio 0.3 removed the `mpsc::Sender::poll_send` method.  See

ddc64e8d4d

for more context on the Tower changes.  To match Tower as closely as possible
in order to be able to upstream `tower-batch`, those changes are copied from
`tower::Buffer` to `tower-batch`.
This commit is contained in:
Henry de Valence 2020-11-19 11:54:20 -08:00
parent ec00ee4cf0
commit add94c1c45
23 changed files with 401 additions and 201 deletions

251
Cargo.lock generated
View File

@ -378,6 +378,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
[[package]]
name = "canonical-path"
version = "2.0.2"
@ -463,6 +469,15 @@ dependencies = [
"bitflags",
]
[[package]]
name = "cloudabi"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467"
dependencies = [
"bitflags",
]
[[package]]
name = "color-backtrace"
version = "0.3.0"
@ -1046,7 +1061,7 @@ dependencies = [
"futures-sink",
"futures-task",
"memchr",
"pin-project 1.0.1",
"pin-project 1.0.2",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
@ -1166,7 +1181,7 @@ version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535"
dependencies = [
"bytes",
"bytes 0.5.6",
"fnv",
"futures-core",
"futures-sink",
@ -1174,7 +1189,7 @@ dependencies = [
"http",
"indexmap",
"slab",
"tokio",
"tokio 0.2.23",
"tokio-util 0.3.1",
"tracing",
"tracing-futures",
@ -1230,7 +1245,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
dependencies = [
"bytes",
"bytes 0.5.6",
"fnv",
"itoa",
]
@ -1241,7 +1256,7 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
dependencies = [
"bytes",
"bytes 0.5.6",
"http",
]
@ -1272,7 +1287,7 @@ version = "0.13.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures-channel",
"futures-core",
"futures-util",
@ -1282,9 +1297,9 @@ dependencies = [
"httparse",
"httpdate",
"itoa",
"pin-project 1.0.1",
"pin-project 1.0.2",
"socket2",
"tokio",
"tokio 0.2.23",
"tower-service",
"tracing",
"want",
@ -1362,6 +1377,15 @@ dependencies = [
"str_stack",
]
[[package]]
name = "instant"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "iovec"
version = "0.1.4"
@ -1466,6 +1490,15 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "lock_api"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.11"
@ -1558,7 +1591,7 @@ checksum = "f3fc63816bd5f8bde5eb31ce471f9633adc69ba1c55b44191b4d5fc7e263e8ab"
dependencies = [
"log",
"metrics-core",
"tokio",
"tokio 0.2.23",
]
[[package]]
@ -1614,15 +1647,15 @@ dependencies = [
"metrics-observer-prometheus",
"metrics-observer-yaml",
"metrics-util",
"parking_lot",
"parking_lot 0.10.2",
"quanta",
]
[[package]]
name = "metrics-util"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d11f8090a8886339f9468a04eeea0711e4cf27538b134014664308041307a1c5"
checksum = "277619f040719a5a23d75724586d5601286e8fa53451cfaaca3b8c627c2c2378"
dependencies = [
"crossbeam-epoch 0.8.2",
"serde",
@ -1658,26 +1691,16 @@ dependencies = [
]
[[package]]
name = "mio-named-pipes"
version = "0.1.7"
name = "mio"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b"
dependencies = [
"log",
"mio",
"miow 0.3.5",
"winapi 0.3.9",
]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio",
"log",
"miow 0.3.6",
"ntapi",
"winapi 0.3.9",
]
[[package]]
@ -1694,9 +1717,9 @@ dependencies = [
[[package]]
name = "miow"
version = "0.3.5"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e"
checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
dependencies = [
"socket2",
"winapi 0.3.9",
@ -1739,6 +1762,15 @@ dependencies = [
"version_check 0.9.2",
]
[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-format"
version = "0.4.0"
@ -1844,8 +1876,19 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [
"lock_api",
"parking_lot_core",
"lock_api 0.3.4",
"parking_lot_core 0.7.2",
]
[[package]]
name = "parking_lot"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
dependencies = [
"instant",
"lock_api 0.4.2",
"parking_lot_core 0.8.0",
]
[[package]]
@ -1855,10 +1898,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
dependencies = [
"cfg-if 0.1.10",
"cloudabi",
"cloudabi 0.0.3",
"libc",
"redox_syscall",
"smallvec 1.4.2",
"smallvec 1.5.0",
"winapi 0.3.9",
]
[[package]]
name = "parking_lot_core"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b"
dependencies = [
"cfg-if 0.1.10",
"cloudabi 0.1.0",
"instant",
"libc",
"redox_syscall",
"smallvec 1.5.0",
"winapi 0.3.9",
]
@ -1885,11 +1943,11 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee41d838744f60d959d7074e3afb6b35c7456d0f61cad38a24e35e6553f73841"
checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7"
dependencies = [
"pin-project-internal 1.0.1",
"pin-project-internal 1.0.2",
]
[[package]]
@ -1905,9 +1963,9 @@ dependencies = [
[[package]]
name = "pin-project-internal"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81a4ffa594b66bff340084d4081df649a7dc049ac8d7fc458d8e628bfbbb2f86"
checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
@ -1920,6 +1978,12 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b"
[[package]]
name = "pin-project-lite"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c"
[[package]]
name = "pin-utils"
version = "0.1.0"
@ -2559,9 +2623,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.4.2"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252"
checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85"
[[package]]
name = "socket2"
@ -2717,9 +2781,9 @@ dependencies = [
[[package]]
name = "termcolor"
version = "1.1.0"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
dependencies = [
"winapi-util",
]
@ -2794,18 +2858,33 @@ version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff"
dependencies = [
"bytes",
"bytes 0.5.6",
"fnv",
"futures-core",
"iovec",
"lazy_static",
"memchr",
"mio 0.6.22",
"pin-project-lite 0.1.11",
"slab",
]
[[package]]
name = "tokio"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61"
dependencies = [
"autocfg",
"bytes 0.6.0",
"futures-core",
"lazy_static",
"libc",
"memchr",
"mio",
"mio-named-pipes",
"mio-uds",
"mio 0.7.6",
"num_cpus",
"pin-project-lite",
"parking_lot 0.11.1",
"pin-project-lite 0.2.0",
"signal-hook-registry",
"slab",
"tokio-macros",
@ -2815,41 +2894,41 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "0.2.6"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a"
checksum = "21d30fdbb5dc2d8f91049691aa1a9d4d4ae422a21c334ce8936e5886d30c5c45"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.48",
]
[[package]]
name = "tokio-util"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
dependencies = [
"bytes",
"bytes 0.5.6",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
"pin-project-lite 0.1.11",
"tokio 0.2.23",
]
[[package]]
name = "tokio-util"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73af76301319bcacf00d26d3c75534ef248dcad7ceaf36d93ec902453c3b1706"
dependencies = [
"bytes 0.6.0",
"futures-core",
"futures-sink",
"log",
"pin-project-lite 0.1.11",
"tokio 0.3.4",
]
[[package]]
@ -2863,14 +2942,14 @@ dependencies = [
[[package]]
name = "tower"
version = "0.3.1"
source = "git+https://github.com/tower-rs/tower?rev=1a84543#1a845433177b31ebc6daff765ee81468c42d6676"
version = "0.4.0"
source = "git+https://github.com/tower-rs/tower?rev=5e1e07744820028877654c336a3b9fe057bf46f1#5e1e07744820028877654c336a3b9fe057bf46f1"
dependencies = [
"futures-core",
"futures-util",
"hdrhistogram",
"pin-project 0.4.27",
"tokio",
"pin-project 1.0.2",
"tokio 0.3.4",
"tower-layer",
"tower-service",
"tracing",
@ -2886,7 +2965,7 @@ dependencies = [
"futures-core",
"pin-project 0.4.27",
"rand 0.7.3",
"tokio",
"tokio 0.3.4",
"tower",
"tower-fallback",
"tracing",
@ -2900,7 +2979,7 @@ version = "0.1.0"
dependencies = [
"futures-core",
"pin-project 0.4.27",
"tokio",
"tokio 0.3.4",
"tower",
"tracing",
"zebra-test",
@ -2909,7 +2988,7 @@ dependencies = [
[[package]]
name = "tower-layer"
version = "0.3.0"
source = "git+https://github.com/tower-rs/tower?rev=1a84543#1a845433177b31ebc6daff765ee81468c42d6676"
source = "git+https://github.com/tower-rs/tower?rev=5e1e07744820028877654c336a3b9fe057bf46f1#5e1e07744820028877654c336a3b9fe057bf46f1"
[[package]]
name = "tower-service"
@ -2937,7 +3016,7 @@ checksum = "b0987850db3733619253fe60e17cb59b82d37c7e6c0236bb81e4d6b87c879f27"
dependencies = [
"cfg-if 0.1.10",
"log",
"pin-project-lite",
"pin-project-lite 0.1.11",
"tracing-attributes",
"tracing-core",
]
@ -3045,7 +3124,7 @@ dependencies = [
"serde",
"serde_json",
"sharded-slab",
"smallvec 1.4.2",
"smallvec 1.5.0",
"thread_local",
"tracing",
"tracing-core",
@ -3088,9 +3167,9 @@ dependencies = [
[[package]]
name = "unicode-normalization"
version = "0.1.14"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7f98e67a4d84f730d343392f9bfff7d21e3fca562b9cb7a43b768350beeddc6"
checksum = "a13e63ab62dbe32aeee58d1c5408d35c36c392bba5d9d3142287219721afe606"
dependencies = [
"tinyvec",
]
@ -3340,7 +3419,7 @@ dependencies = [
"serde",
"spandoc",
"thiserror",
"tokio",
"tokio 0.3.4",
"tower",
"tower-batch",
"tower-fallback",
@ -3361,7 +3440,7 @@ version = "3.0.0-alpha.0"
dependencies = [
"bitflags",
"byteorder",
"bytes",
"bytes 0.6.0",
"chrono",
"futures",
"hex",
@ -3373,8 +3452,8 @@ dependencies = [
"rand 0.7.3",
"serde",
"thiserror",
"tokio",
"tokio-util 0.2.0",
"tokio 0.3.4",
"tokio-util 0.5.0",
"tower",
"tracing",
"tracing-error",
@ -3419,7 +3498,7 @@ dependencies = [
"spandoc",
"tempdir",
"thiserror",
"tokio",
"tokio 0.3.4",
"tower",
"tracing",
"tracing-error",
@ -3442,7 +3521,7 @@ dependencies = [
"spandoc",
"tempdir",
"thiserror",
"tokio",
"tokio 0.3.4",
"tower",
"tracing",
"tracing-error",
@ -3484,7 +3563,7 @@ dependencies = [
"serde",
"tempdir",
"thiserror",
"tokio",
"tokio 0.3.4",
"toml",
"tower",
"tracing",

View File

@ -21,4 +21,4 @@ panic = "abort"
panic = "abort"
[patch.crates-io]
tower = { git = "https://github.com/tower-rs/tower", rev = "1a84543" }
tower = { git = "https://github.com/tower-rs/tower", rev = "5e1e07744820028877654c336a3b9fe057bf46f1" }

View File

@ -6,8 +6,8 @@ license = "MIT"
edition = "2018"
[dependencies]
tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] }
tower = { version = "0.3", features = ["util", "buffer"] }
tokio = { version = "0.3", features = ["time", "sync", "stream", "tracing"] }
tower = { version = "0.4", features = ["util", "buffer"] }
futures-core = "0.3.6"
pin-project = "0.4.27"
tracing = "0.1.21"
@ -17,7 +17,7 @@ futures = "0.3.7"
[dev-dependencies]
ed25519-zebra = "2.1.0"
rand = "0.7"
tokio = { version = "0.2", features = ["full"]}
tokio = { version = "0.3", features = ["full"]}
tracing = "0.1.21"
zebra-test = { path = "../zebra-test/" }
tower-fallback = { path = "../tower-fallback/" }

View File

@ -89,6 +89,7 @@ pub mod error;
pub mod future;
mod layer;
mod message;
mod semaphore;
mod service;
mod worker;

View File

@ -7,6 +7,7 @@ pub(crate) struct Message<Request, Fut> {
pub(crate) request: Request,
pub(crate) tx: Tx<Fut>,
pub(crate) span: tracing::Span,
pub(super) _permit: crate::semaphore::Permit,
}
/// Response sender

View File

@ -0,0 +1,78 @@
// Copied from tower/src/semaphore.rs
// When/if tower-batch is upstreamed, delete this file
// and use the common tower semaphore implementation
pub(crate) use self::sync::OwnedSemaphorePermit as Permit;
use futures_core::ready;
use std::{
fmt,
future::Future,
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync;
#[derive(Debug)]
pub(crate) struct Semaphore {
semaphore: Arc<sync::Semaphore>,
state: State,
}
enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync + 'static>>),
Ready(Permit),
Empty,
}
impl Semaphore {
pub(crate) fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(sync::Semaphore::new(permits)),
state: State::Empty,
}
}
pub(crate) fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
self.state = match self.state {
State::Ready(_) => return Poll::Ready(()),
State::Waiting(ref mut fut) => {
let permit = ready!(Pin::new(fut).poll(cx));
State::Ready(permit)
}
State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
};
}
}
pub(crate) fn take_permit(&mut self) -> Option<Permit> {
if let State::Ready(permit) = mem::replace(&mut self.state, State::Empty) {
return Some(permit);
}
None
}
}
impl Clone for Semaphore {
fn clone(&self) -> Self {
Self {
semaphore: self.semaphore.clone(),
state: State::Empty,
}
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Waiting(_) => f
.debug_tuple("State::Waiting")
.field(&format_args!("..."))
.finish(),
State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(),
State::Empty => f.debug_tuple("State::Empty").finish(),
}
}
}

View File

@ -5,6 +5,7 @@ use super::{
BatchControl,
};
use crate::semaphore::Semaphore;
use futures_core::ready;
use std::task::{Context, Poll};
use tokio::sync::{mpsc, oneshot};
@ -18,7 +19,19 @@ pub struct Batch<T, Request>
where
T: Service<BatchControl<Request>>,
{
tx: mpsc::Sender<Message<Request, T::Future>>,
// Note: this actually _is_ bounded, but rather than using Tokio's unbounded
// channel, we use tokio's semaphore separately to implement the bound.
tx: mpsc::UnboundedSender<Message<Request, T::Future>>,
// When the buffer's channel is full, we want to exert backpressure in
// `poll_ready`, so that callers such as load balancers could choose to call
// another service rather than waiting for buffer capacity.
//
// Unfortunately, this can't be done easily using Tokio's bounded MPSC
// channel, because it doesn't expose a polling-based interface, only an
// `async fn ready`, which borrows the sender. Therefore, we implement our
// own bounded MPSC on top of the unbounded channel, using a semaphore to
// limit how many items are in the channel.
semaphore: Semaphore,
handle: Handle,
}
@ -45,10 +58,16 @@ where
Request: Send + 'static,
{
// XXX(hdevalence): is this bound good
let (tx, rx) = mpsc::channel(1);
let bound = 1;
let (tx, rx) = mpsc::unbounded_channel();
let (handle, worker) = Worker::new(service, rx, max_items, max_latency);
tokio::spawn(worker.run());
Batch { tx, handle }
let semaphore = Semaphore::new(bound);
Batch {
tx,
handle,
semaphore,
}
}
fn get_worker_error(&self) -> crate::BoxError {
@ -66,40 +85,43 @@ where
type Future = ResponseFuture<T::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If the inner service has errored, then we error here.
if ready!(self.tx.poll_ready(cx)).is_err() {
Poll::Ready(Err(self.get_worker_error()))
} else {
Poll::Ready(Ok(()))
// First, check if the worker is still alive.
if self.tx.is_closed() {
// If the inner service has errored, then we error here.
return Poll::Ready(Err(self.get_worker_error()));
}
// Then, poll to acquire a semaphore permit. If we acquire a permit,
// then there's enough buffer capacity to send a new request. Otherwise,
// we need to wait for capacity.
ready!(self.semaphore.poll_acquire(cx));
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
// TODO:
// ideally we'd poll_ready again here so we don't allocate the oneshot
// if the try_send is about to fail, but sadly we can't call poll_ready
// outside of task context.
let (tx, rx) = oneshot::channel();
tracing::trace!("sending request to buffer worker");
let _permit = self
.semaphore
.take_permit()
.expect("buffer full; poll_ready must be called first");
// get the current Span so that we can explicitly propagate it to the worker
// if we didn't do this, events on the worker related to this span wouldn't be counted
// towards that span since the worker would have no way of entering it.
let span = tracing::Span::current();
tracing::trace!(parent: &span, "sending request to batch worker");
match self.tx.try_send(Message { request, span, tx }) {
Err(mpsc::error::TrySendError::Closed(_)) => {
ResponseFuture::failed(self.get_worker_error())
}
Err(mpsc::error::TrySendError::Full(_)) => {
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
// in the channel is reserved for the handle. Other `Sender`
// handles may not send a message using that slot. This
// guarantees capacity for `request`.
//
// Given this, the only way to hit this code path is if
// `poll_ready` has not been called & `Ready` returned.
panic!("buffer full; poll_ready must be called first");
}
// If we've made it here, then a semaphore permit has already been
// acquired, so we can freely allocate a oneshot.
let (tx, rx) = oneshot::channel();
match self.tx.send(Message {
request,
span,
tx,
_permit,
}) {
Err(_) => ResponseFuture::failed(self.get_worker_error()),
Ok(_) => ResponseFuture::new(rx),
}
}
@ -113,6 +135,7 @@ where
Self {
tx: self.tx.clone(),
handle: self.handle.clone(),
semaphore: self.semaphore.clone(),
}
}
}

View File

@ -1,18 +1,20 @@
use std::sync::{Arc, Mutex};
use futures::future::TryFutureExt;
use pin_project::pin_project;
use tokio::{
stream::StreamExt,
sync::mpsc,
time::{sleep, Sleep},
};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use super::{
error::{Closed, ServiceError},
message::{self, Message},
BatchControl,
};
use futures::future::TryFutureExt;
use pin_project::pin_project;
use std::sync::{Arc, Mutex};
use tokio::{
stream::StreamExt,
sync::mpsc,
time::{delay_for, Delay},
};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
/// Task that handles processing the buffer. This type should not be used
/// directly, instead `Buffer` requires an `Executor` that can accept this task.
@ -28,7 +30,7 @@ where
T: Service<BatchControl<Request>>,
T::Error: Into<crate::BoxError>,
{
rx: mpsc::Receiver<Message<Request, T::Future>>,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
service: T,
failed: Option<ServiceError>,
handle: Handle,
@ -49,7 +51,7 @@ where
{
pub(crate) fn new(
service: T,
rx: mpsc::Receiver<Message<Request, T::Future>>,
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
max_items: usize,
max_latency: std::time::Duration,
) -> (Handle, Worker<T, Request>) {
@ -103,12 +105,11 @@ where
}
pub async fn run(mut self) {
use futures::future::Either::{Left, Right};
// The timer is started when the first entry of a new batch is
// submitted, so that the batch latency of all entries is at most
// self.max_latency. However, we don't keep the timer running unless
// there is a pending request to prevent wakeups on idle services.
let mut timer: Option<Delay> = None;
let mut timer: Option<Sleep> = None;
let mut pending_items = 0usize;
loop {
match timer {
@ -120,40 +121,42 @@ where
// Apply the provided span to request processing
.instrument(span)
.await;
timer = Some(delay_for(self.max_latency));
timer = Some(sleep(self.max_latency));
pending_items = 1;
}
// No more messages, ever.
None => return,
},
Some(delay) => {
Some(mut sleep) => {
// Wait on either a new message or the batch timer.
match futures::future::select(self.rx.next(), delay).await {
Left((Some(msg), delay)) => {
let span = msg.span;
self.process_req(msg.request, msg.tx)
// Apply the provided span to request processing.
.instrument(span)
.await;
pending_items += 1;
// Check whether we have too many pending items.
if pending_items >= self.max_items {
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
// Now we have an empty batch.
timer = None;
pending_items = 0;
} else {
// The timer is still running, set it back!
timer = Some(delay);
tokio::select! {
maybe_msg = self.rx.recv() => match maybe_msg {
Some(msg) => {
let span = msg.span;
self.process_req(msg.request, msg.tx)
// Apply the provided span to request processing.
.instrument(span)
.await;
pending_items += 1;
// Check whether we have too many pending items.
if pending_items >= self.max_items {
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
// Now we have an empty batch.
timer = None;
pending_items = 0;
} else {
// The timer is still running, set it back!
timer = Some(sleep);
}
}
}
// No more messages, ever.
Left((None, _delay)) => {
return;
}
// The batch timer elapsed.
Right(((), _next)) => {
None => {
// No more messages, ever.
return;
}
},
() = &mut sleep => {
// The batch timer elapsed.
// XXX(hdevalence): what span should instrument this?
self.flush_service().await;
timer = None;

View File

@ -10,7 +10,7 @@ use color_eyre::{eyre::eyre, Report};
use ed25519_zebra::*;
use futures::stream::{FuturesUnordered, StreamExt};
use rand::thread_rng;
use tokio::sync::broadcast::{channel, RecvError, Sender};
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tower::{Service, ServiceExt};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;

View File

@ -6,11 +6,11 @@ license = "MIT"
edition = "2018"
[dependencies]
tower = "0.3"
tower = "0.4"
futures-core = "0.3.6"
pin-project = "0.4.27"
tracing = "0.1"
[dev-dependencies]
zebra-test = { path = "../zebra-test/" }
tokio = { version = "0.2", features = ["full"]}
tokio = { version = "0.3", features = ["full"]}

View File

@ -19,8 +19,8 @@ futures = "0.3.7"
futures-util = "0.3.6"
metrics = "0.12"
thiserror = "1.0.22"
tokio = { version = "0.2.22", features = ["time", "sync", "stream", "tracing"] }
tower = { version = "0.3", features = ["timeout", "util", "buffer"] }
tokio = { version = "0.3", features = ["time", "sync", "stream", "tracing"] }
tower = { version = "0.4", features = ["timeout", "util", "buffer"] }
tower-util = "0.3"
tracing = "0.1.21"
tracing-futures = "0.2.4"
@ -34,7 +34,7 @@ zebra-script = { path = "../zebra-script" }
[dev-dependencies]
rand = "0.7"
spandoc = "0.2"
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }
tracing-error = "0.1.2"
tracing-subscriber = "0.2.15"

View File

@ -14,7 +14,7 @@ use futures::future::{ready, Ready};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::broadcast::{channel, RecvError, Sender};
use tokio::sync::broadcast::{channel, error::RecvError, Sender};
use tower::{util::ServiceFn, Service};
use tower_batch::{Batch, BatchControl};
use tower_fallback::Fallback;

View File

@ -10,7 +10,7 @@ edition = "2018"
[dependencies]
bitflags = "1.2"
byteorder = "1.3"
bytes = "0.5"
bytes = "0.6"
chrono = "0.4"
hex = "0.4"
# indexmap has rayon support for parallel iteration,
@ -22,9 +22,9 @@ serde = { version = "1", features = ["serde_derive"] }
thiserror = "1"
futures = "0.3"
tokio = { version = "0.2.22", features = ["net", "time", "stream", "tracing", "macros"] }
tokio-util = { version = "0.2", features = ["codec"] }
tower = { version = "0.3", features = ["retry", "discover", "load", "load-shed", "timeout", "util", "buffer"] }
tokio = { version = "0.3", features = ["net", "time", "stream", "tracing", "macros"] }
tokio-util = { version = "0.5", features = ["codec"] }
tower = { version = "0.4", features = ["retry", "discover", "load", "load-shed", "timeout", "util", "buffer"] }
metrics = "0.12"
tracing = "0.1"

View File

@ -93,7 +93,7 @@ mod tests {
use futures::stream::StreamExt;
use tokio_util::codec::Framed;
let mut listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen_addr = listener.local_addr().unwrap();
let conn = tokio::net::TcpStream::connect(listen_addr).await.unwrap();

View File

@ -23,7 +23,7 @@ use futures::{
prelude::*,
stream::Stream,
};
use tokio::time::{delay_for, Delay};
use tokio::time::{sleep, Sleep};
use tower::Service;
use tracing_futures::Instrument;
@ -221,7 +221,7 @@ pub struct Connection<S, Tx> {
/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Delay>,
pub(super) request_timer: Option<Sleep>,
pub(super) svc: S,
pub(super) client_rx: mpsc::Receiver<ClientRequest>,
/// A slot for an error shared between the Connection and the Client that uses it.
@ -553,7 +553,7 @@ where
} {
Ok(new_state) => {
self.state = new_state;
self.request_timer = Some(delay_for(constants::REQUEST_TIMEOUT));
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
Err(e) => self.fail_with(e),
}

View File

@ -211,7 +211,7 @@ where
S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
let mut listener = TcpListener::bind(addr).await?;
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
info!("Opened Zcash protocol endpoint at {}", local_addr);
loop {

View File

@ -1,8 +1,7 @@
//! Inventory Registry Implementation
//!
//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
use crate::{protocol::external::InventoryHash, BoxError};
use futures::Stream;
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
@ -10,17 +9,20 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use futures::{Stream, StreamExt};
use tokio::{
sync::broadcast,
time::{self, Interval},
};
use crate::{protocol::external::InventoryHash, BoxError};
/// An Inventory Registry for tracking recent inventory advertisements by peer.
///
/// For more details please refer to the [RFC].
///
/// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
#[derive(Debug)]
pub struct InventoryRegistry {
/// Map tracking the inventory advertisements from the current interval
/// period
@ -28,18 +30,33 @@ pub struct InventoryRegistry {
/// Map tracking inventory advertisements from the previous interval period
prev: HashMap<InventoryHash, HashSet<SocketAddr>>,
/// Stream of incoming inventory hashes to register
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
inv_stream: Pin<
Box<
dyn Stream<Item = Result<(InventoryHash, SocketAddr), broadcast::error::RecvError>>
+ Send
+ 'static,
>,
>,
/// Interval tracking how frequently we should rotate our maps
interval: Interval,
}
impl std::fmt::Debug for InventoryRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InventoryRegistry")
.field("current", &self.current)
.field("prev", &self.prev)
.finish()
}
}
impl InventoryRegistry {
/// Returns an Inventory Registry
pub fn new(inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>) -> Self {
Self {
current: Default::default(),
prev: Default::default(),
inv_stream,
inv_stream: inv_stream.into_stream().boxed(),
interval: time::interval(Duration::from_secs(75)),
}
}
@ -60,7 +77,7 @@ impl InventoryRegistry {
/// - rotates HashMaps based on interval events
/// - drains the inv_stream channel and registers all advertised inventory
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> {
while let Poll::Ready(_) = self.interval.poll_tick(cx) {
while let Poll::Ready(_) = Pin::new(&mut self.interval).poll_next(cx) {
self.rotate();
}
@ -80,7 +97,7 @@ impl InventoryRegistry {
// rather than propagating it through the peer set's Service::poll_ready
// implementation, where reporting a failure means reporting a permanent
// failure of the peer set.
use broadcast::RecvError;
use broadcast::error::RecvError;
while let Poll::Ready(Some(channel_result)) = Pin::new(&mut self.inv_stream).poll_next(cx) {
match channel_result {
Ok((hash, addr)) => self.register(hash, addr),

View File

@ -104,11 +104,10 @@ impl Builder {
// ======== Encoding =========
impl Encoder for Codec {
type Item = Message;
impl Encoder<Message> for Codec {
type Error = Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
use Error::Parse;
// XXX(HACK): this is inefficient and does an extra allocation.
// instead, we should have a size estimator for the message, reserve
@ -587,7 +586,7 @@ mod tests {
let services = PeerServices::NODE_NETWORK;
let timestamp = Utc.timestamp(1_568_000_000, 0);
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
let v = Message::Version {
version: crate::constants::CURRENT_VERSION,
@ -634,7 +633,7 @@ mod tests {
fn filterload_message_round_trip() {
zebra_test::init();
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
let v = Message::FilterLoad {
filter: Filter(vec![0; 35999]),
@ -670,7 +669,7 @@ mod tests {
fn filterload_message_too_large_round_trip() {
zebra_test::init();
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
let v = Message::FilterLoad {
filter: Filter(vec![0; 40000]),
@ -706,7 +705,7 @@ mod tests {
use zebra_chain::serialization::ZcashDeserializeInto;
zebra_test::init();
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
// make tests with a Tx message
let tx = zebra_test::vectors::DUMMY_TX1

View File

@ -18,11 +18,11 @@ serde = { version = "1", features = ["serde_derive"] }
futures = "0.3.7"
metrics = "0.12"
tower = { version = "0.3.1", features = ["buffer", "util"] }
tower = { version = "0.4", features = ["buffer", "util"] }
tracing = "0.1"
tracing-error = "0.1.2"
thiserror = "1.0.22"
tokio = { version = "0.2.22", features = ["sync"] }
tokio = { version = "0.3", features = ["sync"] }
displaydoc = "0.1.7"
rocksdb = "0.15.0"
tempdir = "0.3.7"
@ -34,6 +34,6 @@ zebra-test = { path = "../zebra-test/" }
once_cell = "1.5"
spandoc = "0.2"
tempdir = "0.3.7"
tokio = { version = "0.2.22", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }
proptest = "0.10.1"
primitive-types = "0.7.3"

View File

@ -10,7 +10,7 @@ edition = "2018"
[dependencies]
hex = "0.4.2"
lazy_static = "1.4.0"
tower = { version = "0.3.1", features = ["util"] }
tower = { version = "0.4", features = ["util"] }
futures = "0.3.7"
color-eyre = "0.5.7"
tracing = "0.1.21"
@ -25,4 +25,4 @@ proptest = "0.10.1"
tempdir = "0.3.7"
[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }

View File

@ -21,8 +21,8 @@ rand = "0.7"
hyper = "0.13.9"
futures = "0.3"
tokio = { version = "0.2.22", features = ["time", "rt-threaded", "stream", "macros", "tracing", "signal"] }
tower = { version = "0.3", features = ["hedge", "limit"] }
tokio = { version = "0.3", features = ["time", "rt-multi-thread", "stream", "macros", "tracing", "signal"] }
tower = { version = "0.4", features = ["hedge", "limit"] }
pin-project = "0.4.23"
color-eyre = { version = "0.5.7", features = ["issue-url"] }

View File

@ -5,7 +5,7 @@ use futures::{
future::FutureExt,
stream::{FuturesUnordered, StreamExt},
};
use tokio::time::delay_for;
use tokio::time::sleep;
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt,
@ -153,7 +153,7 @@ where
// due to protocol limitations
self.request_genesis().await?;
// Distinguishes a restart from a start, so we don't delay when starting
// Distinguishes a restart from a start, so we don't sleep when starting
// the sync process, but we can keep restart logic in one place.
let mut started_once = false;
@ -163,7 +163,7 @@ where
self.prospective_tips = HashSet::new();
self.downloads.cancel_all();
self.update_metrics();
delay_for(SYNC_RESTART_TIMEOUT).await;
sleep(SYNC_RESTART_TIMEOUT).await;
} else {
started_once = true;
}

View File

@ -22,9 +22,8 @@ impl TokioComponent {
pub fn new() -> Result<Self, FrameworkError> {
Ok(Self {
rt: Some(
tokio::runtime::Builder::new()
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.threaded_scheduler()
.build()
.unwrap(),
),