From 4f7e45bb24ae76083e2ce1728a24f5e34fde10e5 Mon Sep 17 00:00:00 2001 From: Illia Bobyr Date: Thu, 11 Apr 2024 22:02:44 -0700 Subject: [PATCH] metrics: Submit metrics when exiting. Refactor `MetricsAgent::run()`. (#718) There are a few minor issues this change addresses: 1. When we send points to the `MetricsWriter` we are calling `Instant::now()` twice, using the first result in the metrics stats, and using the seconds value for `last_write_time`. Yet, on the next upload, we would use `last_write_time` as a reference point. We upload metrics using a network call, so it is far from instantaneous. This creates a minor discrepancy in our time reporting. Good news is that we do not really need to call `Instant::now()` twice at all, as we can use the same value for both stats and `last_write_time`. 2. We did not report metrics stats if we did not have any points accumulated. It seems better to always report metric stats, including when no points have been accumulated. In practice, this does not happen for the validator, as validators always report something during a 10-second accumulation interval. 3. We did not upload any points when the metrics thread was existing. This may cause a short number of metrics not to be reported. 4. `collect_points()` was always converting both `points` and `counters` into a vector of `DataPoint`, even if the final length was over the specified `max_points`. In the `mainnet-beta` we have values of up to 5m points lost, so it could be a small optimization if we drop them sooner. --- metrics/src/metrics.rs | 144 ++++++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 52 deletions(-) diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index b989ada68..20622974c 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -203,51 +203,84 @@ impl MetricsAgent { Self { sender } } - fn collect_points(points: &mut Vec, counters: &mut CounterMap) -> Vec { - let mut ret = std::mem::take(points); - ret.extend(counters.values().map(|v| v.into())); - counters.clear(); - ret - } - - fn write( - writer: &Arc, - mut points: Vec, + // Combines `points` and `counters` into a single array of `DataPoint`s, appending a data point + // with the metrics stats at the end. + // + // Limits the number of produced points to the `max_points` value. Takes `points` followed by + // `counters`, dropping `counters` first. + // + // `max_points_per_sec` is only used in a warning message. + // `points_buffered` is used in the stats. + fn combine_points( max_points: usize, max_points_per_sec: usize, - last_write_time: Instant, + secs_since_last_write: u64, points_buffered: usize, - ) { - if points.is_empty() { - return; - } + points: &mut Vec, + counters: &mut CounterMap, + ) -> Vec { + // Reserve one slot for the stats point we will add at the end. + let max_points = max_points.saturating_sub(1); + + let num_points = points.len().saturating_add(counters.len()); + let fit_counters = max_points.saturating_sub(points.len()); + let points_written = cmp::min(num_points, max_points); - let now = Instant::now(); - let num_points = points.len(); debug!("run: attempting to write {} points", num_points); + if num_points > max_points { warn!( - "max submission rate of {} datapoints per second exceeded. only the - first {} of {} points will be submitted", + "Max submission rate of {} datapoints per second exceeded. Only the \ + first {} of {} points will be submitted.", max_points_per_sec, max_points, num_points ); } - let points_written = cmp::min(num_points, max_points - 1); - points.truncate(points_written); - points.push( + + let mut combined = std::mem::take(points); + combined.truncate(points_written); + + combined.extend(counters.values().take(fit_counters).map(|v| v.into())); + counters.clear(); + + combined.push( DataPoint::new("metrics") .add_field_i64("points_written", points_written as i64) .add_field_i64("num_points", num_points as i64) .add_field_i64("points_lost", (num_points - points_written) as i64) .add_field_i64("points_buffered", points_buffered as i64) - .add_field_i64( - "secs_since_last_write", - now.duration_since(last_write_time).as_secs() as i64, - ) + .add_field_i64("secs_since_last_write", secs_since_last_write as i64) .to_owned(), ); - writer.write(points); + combined + } + + // Consumes provided `points`, sending up to `max_points` of them into the `writer`. + // + // Returns an updated value for `last_write_time`. Which is equal to `Instant::now()`, just + // before `write` in updated. + fn write( + writer: &Arc, + max_points: usize, + max_points_per_sec: usize, + last_write_time: Instant, + points_buffered: usize, + points: &mut Vec, + counters: &mut CounterMap, + ) -> Instant { + let now = Instant::now(); + let secs_since_last_write = now.duration_since(last_write_time).as_secs(); + + writer.write(Self::combine_points( + max_points, + max_points_per_sec, + secs_since_last_write, + points_buffered, + points, + counters, + )); + + now } fn run( @@ -263,20 +296,28 @@ impl MetricsAgent { let max_points = write_frequency.as_secs() as usize * max_points_per_sec; + // Bind common arguments in the `Self::write()` call. + let write = |last_write_time: Instant, + points: &mut Vec, + counters: &mut CounterMap| + -> Instant { + Self::write( + writer, + max_points, + max_points_per_sec, + last_write_time, + receiver.len(), + points, + counters, + ) + }; + loop { match receiver.recv_timeout(write_frequency / 2) { Ok(cmd) => match cmd { MetricsCommand::Flush(barrier) => { debug!("metrics_thread: flush"); - Self::write( - writer, - Self::collect_points(&mut points, &mut counters), - max_points, - max_points_per_sec, - last_write_time, - receiver.len(), - ); - last_write_time = Instant::now(); + last_write_time = write(last_write_time, &mut points, &mut counters); barrier.wait(); } MetricsCommand::Submit(point, level) => { @@ -293,9 +334,7 @@ impl MetricsAgent { } } }, - Err(RecvTimeoutError::Timeout) => { - trace!("run: receive timeout"); - } + Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => { debug!("run: sender disconnected"); break; @@ -304,17 +343,11 @@ impl MetricsAgent { let now = Instant::now(); if now.duration_since(last_write_time) >= write_frequency { - Self::write( - writer, - Self::collect_points(&mut points, &mut counters), - max_points, - max_points_per_sec, - last_write_time, - receiver.len(), - ); - last_write_time = now; + last_write_time = write(last_write_time, &mut points, &mut counters); } } + + let _ = write(last_write_time, &mut points, &mut counters); trace!("run: exit"); } @@ -628,12 +661,15 @@ mod test { #[test] fn test_submit_exceed_max_rate() { let writer = Arc::new(MockMetricsWriter::new()); - let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100); - for i in 0..102 { + let max_points_per_sec = 100; + + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec); + + for i in 0..(max_points_per_sec + 20) { agent.submit( DataPoint::new("measurement") - .add_field_i64("i", i) + .add_field_i64("i", i.try_into().unwrap()) .to_owned(), Level::Info, ); @@ -642,7 +678,11 @@ mod test { thread::sleep(Duration::from_secs(2)); agent.flush(); - assert_eq!(writer.points_written(), 100); + + // We are expecting `max_points_per_sec - 1` data points from `submit()` and two more metric + // stats data points. One from the timeout when all the `submit()`ed values are sent when 1 + // second is elapsed, and then one more from the explicit `flush()`. + assert_eq!(writer.points_written(), max_points_per_sec + 1); } #[test]