From e2830f5b0e645eab4d3225f40a2e17547ce17198 Mon Sep 17 00:00:00 2001 From: Dan Albert Date: Mon, 13 May 2019 14:17:25 -0600 Subject: [PATCH] Add rate limit to metrics datapoint submission (#4237) Cleanup Raise limit on submission threshold Pick nits and add metrics point fmt Fixup compiler warning Cleanup if-else Append new point to vec rather than submit --- metrics/src/metrics.rs | 82 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 10 deletions(-) diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index d3f99ddaf..f8f022063 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -133,28 +133,38 @@ impl Default for MetricsAgent { Self::new( Arc::new(InfluxDbMetricsWriter::new()), Duration::from_secs(10), + //max per-second datapoint submission limit + 4000, ) } } impl MetricsAgent { - fn new(writer: Arc, write_frequency: Duration) -> Self { + fn new( + writer: Arc, + write_frequency_secs: Duration, + max_points_per_sec: usize, + ) -> Self { let (sender, receiver) = channel::(); - thread::spawn(move || Self::run(&receiver, &writer, write_frequency)); + thread::spawn(move || { + Self::run(&receiver, &writer, write_frequency_secs, max_points_per_sec) + }); Self { sender } } fn run( receiver: &Receiver, writer: &Arc, - write_frequency: Duration, + write_frequency_secs: Duration, + max_points_per_sec: usize, ) { trace!("run: enter"); let mut last_write_time = Instant::now(); let mut points = Vec::new(); + let max_points = write_frequency_secs.as_secs() as usize * max_points_per_sec; loop { - match receiver.recv_timeout(write_frequency / 2) { + match receiver.recv_timeout(write_frequency_secs / 2) { Ok(cmd) => match cmd { MetricsCommand::Flush(barrier) => { debug!("metrics_thread: flush"); @@ -180,8 +190,44 @@ impl MetricsAgent { } let now = Instant::now(); - if now.duration_since(last_write_time) >= write_frequency && !points.is_empty() { - debug!("run: writing {} points", points.len()); + if now.duration_since(last_write_time) >= write_frequency_secs && !points.is_empty() { + let num_points = points.len(); + let points_written; + debug!("run: attempting to write {} points", points.len()); + if points.len() > max_points { + warn!( + "max submission rate of {} datapoints per second exceeded. only the + first {} of {} points will be submitted", + max_points_per_sec, + max_points, + points.len() + ); + points.truncate(max_points - 1); + } + points_written = points.len(); + + points.push( + influxdb::Point::new("metrics") + .add_timestamp(timing::timestamp() as i64) + .add_field("host_id", influxdb::Value::String(HOST_INFO.to_string())) + .add_field( + "points_written", + influxdb::Value::Integer(points_written as i64), + ) + .add_field("num_points", influxdb::Value::Integer(num_points as i64)) + .add_field( + "secs_since_last_write", + influxdb::Value::Integer( + now.duration_since(last_write_time).as_secs() as i64 + ), + ) + .add_field( + "points_rate_exceeded", + influxdb::Value::Boolean(num_points > max_points), + ) + .to_owned(), + ); + writer.write(points); points = Vec::new(); last_write_time = now; @@ -347,7 +393,7 @@ mod test { #[test] fn test_submit() { let writer = Arc::new(MockMetricsWriter::new()); - let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10)); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000); for i in 0..42 { agent.submit(influxdb::Point::new(&format!("measurement {}", i))); @@ -360,11 +406,26 @@ mod test { #[test] fn test_submit_with_delay() { let writer = Arc::new(MockMetricsWriter::new()); - let agent = MetricsAgent::new(writer.clone(), Duration::from_millis(100)); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000); agent.submit(influxdb::Point::new("point 1")); thread::sleep(Duration::from_secs(2)); - assert_eq!(writer.points_written(), 1); + assert_eq!(writer.points_written(), 2); + } + + #[test] + fn test_submit_exceed_max_rate() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100); + + for i in 0..102 { + agent.submit(influxdb::Point::new(&format!("measurement {}", i))); + } + + thread::sleep(Duration::from_secs(2)); + + agent.flush(); + assert_eq!(writer.points_written(), 100); } #[test] @@ -373,6 +434,7 @@ mod test { let agent = Arc::new(Mutex::new(MetricsAgent::new( writer.clone(), Duration::from_secs(10), + 1000, ))); // @@ -399,7 +461,7 @@ mod test { fn test_flush_before_drop() { let writer = Arc::new(MockMetricsWriter::new()); { - let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999)); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999), 1000); agent.submit(influxdb::Point::new("point 1")); }