From cf5691fe2a164b686600218082f98e69d3e819ee Mon Sep 17 00:00:00 2001 From: Daniel Fox Franke Date: Tue, 14 Jan 2020 23:11:42 -0500 Subject: [PATCH] Migrate to core::futures (#65) * Migrate to core::futures This drops all dependencies on the 'futures' crate, and upgrades to hyper-0.13 and tokio-0.2. Methods that return futures now use async/await. Since hyper has removed hyper_run(), I have in turn removed the HttpExporter::run() method. The user should just call async_run() and spawn or await the result however they wish. * Bump CI minimum rust to 1.39 for async/await Co-authored-by: Toby Lawrence --- ci/azure-test-minimum.yaml | 2 +- metrics-exporter-http/Cargo.toml | 2 +- metrics-exporter-http/src/lib.rs | 84 +++++++++--------------------- metrics-exporter-log/Cargo.toml | 3 +- metrics-exporter-log/src/lib.rs | 26 +++++---- metrics-runtime/Cargo.toml | 2 +- metrics-runtime/examples/facade.rs | 6 ++- 7 files changed, 46 insertions(+), 79 deletions(-) diff --git a/ci/azure-test-minimum.yaml b/ci/azure-test-minimum.yaml index 7a78772..dd8c868 100644 --- a/ci/azure-test-minimum.yaml +++ b/ci/azure-test-minimum.yaml @@ -15,6 +15,6 @@ jobs: steps: - template: azure-install-rust.yml parameters: - rust_version: 1.34.0 + rust_version: 1.39.0 - script: cargo test displayName: cargo test diff --git a/metrics-exporter-http/Cargo.toml b/metrics-exporter-http/Cargo.toml index 033cc43..e03e248 100644 --- a/metrics-exporter-http/Cargo.toml +++ b/metrics-exporter-http/Cargo.toml @@ -17,5 +17,5 @@ keywords = ["metrics", "metrics-core", "exporter", "http"] [dependencies] metrics-core = { path = "../metrics-core", version = "^0.5" } -hyper = "^0.12" +hyper = "^0.13" log = "^0.4" diff --git a/metrics-exporter-http/src/lib.rs b/metrics-exporter-http/src/lib.rs index 85bf357..a88af00 100644 --- a/metrics-exporter-http/src/lib.rs +++ b/metrics-exporter-http/src/lib.rs @@ -3,20 +3,12 @@ //! This exporter can utilize observers that are able to be converted to a textual representation //! via [`Drain`]. It will respond to any requests, regardless of the method or path. //! -//! # Run Modes -//! - `run` can be used to block the current thread, running the HTTP server on the configured -//! address -//! - `into_future` will return a [`Future`] that when driven will run the HTTP server on the -//! configured address +//! Awaiting on `async_run` will drive an HTTP server listening on the configured address. #![deny(missing_docs)] -#[macro_use] -extern crate log; -use hyper::rt::run as hyper_run; use hyper::{ - rt::Future, - service::service_fn_ok, - {Body, Response, Server}, + service::{make_service_fn, service_fn}, + {Body, Error, Response, Server}, }; use metrics_core::{Builder, Drain, Observe, Observer}; use std::{net::SocketAddr, sync::Arc}; @@ -45,55 +37,31 @@ where } } - /// Run the exporter on the current thread. - /// - /// This starts an HTTP server on the `address` the exporter was originally configured with, + /// Starts an HTTP server on the `address` the exporter was originally configured with, /// responding to any request with the output of the configured observer. - pub fn run(self) { - let server = self.into_future(); - hyper_run(server); - } + pub async fn async_run(self) -> hyper::error::Result<()> { + let builder = Arc::new(self.builder); + let controller = Arc::new(self.controller); - /// Converts this exporter into a future which can be driven externally. - /// - /// This starts an HTTP server on the `address` the exporter was originally configured with, - /// responding to any request with the output of the configured observer. - pub fn into_future(self) -> impl Future { - let controller = self.controller; - let builder = self.builder; - let address = self.address; + let make_svc = make_service_fn(move |_| { + let builder = builder.clone(); + let controller = controller.clone(); - build_hyper_server(controller, builder, address) + async move { + Ok::<_, Error>(service_fn(move |_| { + let builder = builder.clone(); + let controller = controller.clone(); + + async move { + let mut observer = builder.build(); + controller.observe(&mut observer); + let output = observer.drain(); + Ok::<_, Error>(Response::new(Body::from(output))) + } + })) + } + }); + + Server::bind(&self.address).serve(make_svc).await } } - -fn build_hyper_server( - controller: C, - builder: B, - address: SocketAddr, -) -> impl Future -where - C: Observe + Send + Sync + 'static, - B: Builder + Send + Sync + 'static, - B::Output: Drain + Observer, -{ - let builder = Arc::new(builder); - let controller = Arc::new(controller); - - let service = move || { - let controller2 = controller.clone(); - let builder = builder.clone(); - - service_fn_ok(move |_| { - let mut observer = builder.build(); - - controller2.observe(&mut observer); - let output = observer.drain(); - Response::new(Body::from(output)) - }) - }; - - Server::bind(&address) - .serve(service) - .map_err(|e| error!("http exporter server error: {}", e)) -} diff --git a/metrics-exporter-log/Cargo.toml b/metrics-exporter-log/Cargo.toml index 4da3875..f1ed946 100644 --- a/metrics-exporter-log/Cargo.toml +++ b/metrics-exporter-log/Cargo.toml @@ -18,5 +18,4 @@ keywords = ["metrics", "metrics-core", "exporter", "log"] [dependencies] metrics-core = { path = "../metrics-core", version = "^0.5" } log = "^0.4" -futures = "^0.1" -tokio-timer = "^0.2" +tokio = { version = "0.2", features = ["time"] } diff --git a/metrics-exporter-log/src/lib.rs b/metrics-exporter-log/src/lib.rs index c591e11..80467fa 100644 --- a/metrics-exporter-log/src/lib.rs +++ b/metrics-exporter-log/src/lib.rs @@ -5,19 +5,18 @@ //! level. //! //! # Run Modes -//! - `run` can be used to block the current thread, taking snapshots and exporting them on an -//! interval -//! - `into_future` will return a [`Future`] that when driven will take a snapshot on the -//! configured interval and log it +//! - Using `run` will block the current thread, capturing a snapshot and logging it based on the +//! configured interval. +//! - Using `async_run` will return a future that can be awaited on, mimicing the behavior of +//! `run`. #![deny(missing_docs)] #[macro_use] extern crate log; -use futures::prelude::*; use log::Level; use metrics_core::{Builder, Drain, Observe, Observer}; use std::{thread, time::Duration}; -use tokio_timer::Interval; +use tokio::time; /// Exports metrics by converting them to a textual representation and logging them. pub struct LogExporter @@ -65,14 +64,13 @@ where log!(self.level, "{}", output); } - /// Converts this exporter into a future which logs output at the intervel + /// Converts this exporter into a future which logs output at the interval /// given on construction. - pub fn into_future(mut self) -> impl Future { - Interval::new_interval(self.interval) - .map_err(|_| ()) - .for_each(move |_| { - self.turn(); - Ok(()) - }) + pub async fn async_run(mut self) { + let mut interval = time::interval(self.interval); + loop { + interval.tick().await; + self.turn(); + } } } diff --git a/metrics-runtime/Cargo.toml b/metrics-runtime/Cargo.toml index 0233e01..13c2967 100644 --- a/metrics-runtime/Cargo.toml +++ b/metrics-runtime/Cargo.toml @@ -32,7 +32,6 @@ im = "^12" arc-swap = "^0.3" parking_lot = "^0.9" quanta = "^0.3" -futures = "^0.1" crossbeam-utils = "^0.6" metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.3", optional = true } metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.2", optional = true } @@ -47,3 +46,4 @@ getopts = "^0.2" hdrhistogram = "^6.1" criterion = "^0.2.9" lazy_static = "^1.3" +tokio = { version = "^0.2", features = ["macros", "rt-core"] } \ No newline at end of file diff --git a/metrics-runtime/examples/facade.rs b/metrics-runtime/examples/facade.rs index 507ae3c..d0d7369 100644 --- a/metrics-runtime/examples/facade.rs +++ b/metrics-runtime/examples/facade.rs @@ -5,6 +5,7 @@ extern crate getopts; extern crate hdrhistogram; extern crate metrics_core; extern crate metrics_runtime; +extern crate tokio; #[macro_use] extern crate metrics; @@ -119,7 +120,8 @@ pub fn opts() -> Options { opts } -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); let args: Vec = env::args().collect(); @@ -168,7 +170,7 @@ fn main() { .expect("failed to parse http listen address"); let builder = JsonBuilder::new().set_pretty_json(true); let exporter = HttpExporter::new(controller.clone(), builder, addr); - thread::spawn(move || exporter.run()); + tokio::spawn(exporter.async_run()); receiver.install(); info!("receiver configured");