Migrate to core::futures (#65)

* Migrate to core::futures

This drops all dependencies on the 'futures' crate, and upgrades to
hyper-0.13 and tokio-0.2. Methods that return futures now use
async/await.

Since hyper has removed hyper_run(), I have in turn removed the
HttpExporter::run() method. The user should just call async_run()
and spawn or await the result however they wish.

* Bump CI minimum rust to 1.39 for async/await

Co-authored-by: Toby Lawrence <tobz@users.noreply.github.com>
This commit is contained in:
Daniel Fox Franke 2020-01-14 23:11:42 -05:00 committed by Toby Lawrence
parent b48d65aca1
commit cf5691fe2a
7 changed files with 46 additions and 79 deletions

View File

@ -15,6 +15,6 @@ jobs:
steps:
- template: azure-install-rust.yml
parameters:
rust_version: 1.34.0
rust_version: 1.39.0
- script: cargo test
displayName: cargo test

View File

@ -17,5 +17,5 @@ keywords = ["metrics", "metrics-core", "exporter", "http"]
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.5" }
hyper = "^0.12"
hyper = "^0.13"
log = "^0.4"

View File

@ -3,20 +3,12 @@
//! This exporter can utilize observers that are able to be converted to a textual representation
//! via [`Drain<String>`]. It will respond to any requests, regardless of the method or path.
//!
//! # Run Modes
//! - `run` can be used to block the current thread, running the HTTP server on the configured
//! address
//! - `into_future` will return a [`Future`] that when driven will run the HTTP server on the
//! configured address
//! Awaiting on `async_run` will drive an HTTP server listening on the configured address.
#![deny(missing_docs)]
#[macro_use]
extern crate log;
use hyper::rt::run as hyper_run;
use hyper::{
rt::Future,
service::service_fn_ok,
{Body, Response, Server},
service::{make_service_fn, service_fn},
{Body, Error, Response, Server},
};
use metrics_core::{Builder, Drain, Observe, Observer};
use std::{net::SocketAddr, sync::Arc};
@ -45,55 +37,31 @@ where
}
}
/// Run the exporter on the current thread.
///
/// This starts an HTTP server on the `address` the exporter was originally configured with,
/// Starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured observer.
pub fn run(self) {
let server = self.into_future();
hyper_run(server);
}
pub async fn async_run(self) -> hyper::error::Result<()> {
let builder = Arc::new(self.builder);
let controller = Arc::new(self.controller);
/// Converts this exporter into a future which can be driven externally.
///
/// This starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured observer.
pub fn into_future(self) -> impl Future<Item = (), Error = ()> {
let controller = self.controller;
let builder = self.builder;
let address = self.address;
let make_svc = make_service_fn(move |_| {
let builder = builder.clone();
let controller = controller.clone();
build_hyper_server(controller, builder, address)
async move {
Ok::<_, Error>(service_fn(move |_| {
let builder = builder.clone();
let controller = controller.clone();
async move {
let mut observer = builder.build();
controller.observe(&mut observer);
let output = observer.drain();
Ok::<_, Error>(Response::new(Body::from(output)))
}
}))
}
});
Server::bind(&self.address).serve(make_svc).await
}
}
fn build_hyper_server<C, B>(
controller: C,
builder: B,
address: SocketAddr,
) -> impl Future<Item = (), Error = ()>
where
C: Observe + Send + Sync + 'static,
B: Builder + Send + Sync + 'static,
B::Output: Drain<String> + Observer,
{
let builder = Arc::new(builder);
let controller = Arc::new(controller);
let service = move || {
let controller2 = controller.clone();
let builder = builder.clone();
service_fn_ok(move |_| {
let mut observer = builder.build();
controller2.observe(&mut observer);
let output = observer.drain();
Response::new(Body::from(output))
})
};
Server::bind(&address)
.serve(service)
.map_err(|e| error!("http exporter server error: {}", e))
}

View File

@ -18,5 +18,4 @@ keywords = ["metrics", "metrics-core", "exporter", "log"]
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.5" }
log = "^0.4"
futures = "^0.1"
tokio-timer = "^0.2"
tokio = { version = "0.2", features = ["time"] }

View File

@ -5,19 +5,18 @@
//! level.
//!
//! # Run Modes
//! - `run` can be used to block the current thread, taking snapshots and exporting them on an
//! interval
//! - `into_future` will return a [`Future`] that when driven will take a snapshot on the
//! configured interval and log it
//! - Using `run` will block the current thread, capturing a snapshot and logging it based on the
//! configured interval.
//! - Using `async_run` will return a future that can be awaited on, mimicing the behavior of
//! `run`.
#![deny(missing_docs)]
#[macro_use]
extern crate log;
use futures::prelude::*;
use log::Level;
use metrics_core::{Builder, Drain, Observe, Observer};
use std::{thread, time::Duration};
use tokio_timer::Interval;
use tokio::time;
/// Exports metrics by converting them to a textual representation and logging them.
pub struct LogExporter<C, B>
@ -65,14 +64,13 @@ where
log!(self.level, "{}", output);
}
/// Converts this exporter into a future which logs output at the intervel
/// Converts this exporter into a future which logs output at the interval
/// given on construction.
pub fn into_future(mut self) -> impl Future<Item = (), Error = ()> {
Interval::new_interval(self.interval)
.map_err(|_| ())
.for_each(move |_| {
self.turn();
Ok(())
})
pub async fn async_run(mut self) {
let mut interval = time::interval(self.interval);
loop {
interval.tick().await;
self.turn();
}
}
}

View File

@ -32,7 +32,6 @@ im = "^12"
arc-swap = "^0.3"
parking_lot = "^0.9"
quanta = "^0.3"
futures = "^0.1"
crossbeam-utils = "^0.6"
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.3", optional = true }
metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.2", optional = true }
@ -47,3 +46,4 @@ getopts = "^0.2"
hdrhistogram = "^6.1"
criterion = "^0.2.9"
lazy_static = "^1.3"
tokio = { version = "^0.2", features = ["macros", "rt-core"] }

View File

@ -5,6 +5,7 @@ extern crate getopts;
extern crate hdrhistogram;
extern crate metrics_core;
extern crate metrics_runtime;
extern crate tokio;
#[macro_use]
extern crate metrics;
@ -119,7 +120,8 @@ pub fn opts() -> Options {
opts
}
fn main() {
#[tokio::main]
async fn main() {
env_logger::init();
let args: Vec<String> = env::args().collect();
@ -168,7 +170,7 @@ fn main() {
.expect("failed to parse http listen address");
let builder = JsonBuilder::new().set_pretty_json(true);
let exporter = HttpExporter::new(controller.clone(), builder, addr);
thread::spawn(move || exporter.run());
tokio::spawn(exporter.async_run());
receiver.install();
info!("receiver configured");