metrics: Submit metrics when exiting. Refactor `MetricsAgent::run()`. (#718)

There are a few minor issues this change addresses:

1. When we send points to the `MetricsWriter` we are calling
   `Instant::now()` twice, using the first result in the metrics stats,
   and using the seconds value for `last_write_time`.  Yet, on the next
   upload, we would use `last_write_time` as a reference point.

   We upload metrics using a network call, so it is far from
   instantaneous.  This creates a minor discrepancy in our time
   reporting.

   Good news is that we do not really need to call `Instant::now()`
   twice at all, as we can use the same value for both stats and
   `last_write_time`.

2. We did not report metrics stats if we did not have any points
   accumulated.  It seems better to always report metric stats,
   including when no points have been accumulated.  In practice, this
   does not happen for the validator, as validators always report
   something during a 10-second accumulation interval.

3. We did not upload any points when the metrics thread was existing.
   This may cause a short number of metrics not to be reported.

4. `collect_points()` was always converting both `points` and `counters`
   into a vector of `DataPoint`, even if the final length was over the
   specified `max_points`.  In the `mainnet-beta` we have values of up
   to 5m points lost, so it could be a small optimization if we drop
   them sooner.
This commit is contained in:
Illia Bobyr 2024-04-11 22:02:44 -07:00 committed by GitHub
parent dabcc39819
commit 4f7e45bb24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 92 additions and 52 deletions

View File

@ -203,51 +203,84 @@ impl MetricsAgent {
Self { sender }
}
fn collect_points(points: &mut Vec<DataPoint>, counters: &mut CounterMap) -> Vec<DataPoint> {
let mut ret = std::mem::take(points);
ret.extend(counters.values().map(|v| v.into()));
counters.clear();
ret
}
fn write(
writer: &Arc<dyn MetricsWriter + Send + Sync>,
mut points: Vec<DataPoint>,
// Combines `points` and `counters` into a single array of `DataPoint`s, appending a data point
// with the metrics stats at the end.
//
// Limits the number of produced points to the `max_points` value. Takes `points` followed by
// `counters`, dropping `counters` first.
//
// `max_points_per_sec` is only used in a warning message.
// `points_buffered` is used in the stats.
fn combine_points(
max_points: usize,
max_points_per_sec: usize,
last_write_time: Instant,
secs_since_last_write: u64,
points_buffered: usize,
) {
if points.is_empty() {
return;
}
points: &mut Vec<DataPoint>,
counters: &mut CounterMap,
) -> Vec<DataPoint> {
// Reserve one slot for the stats point we will add at the end.
let max_points = max_points.saturating_sub(1);
let num_points = points.len().saturating_add(counters.len());
let fit_counters = max_points.saturating_sub(points.len());
let points_written = cmp::min(num_points, max_points);
let now = Instant::now();
let num_points = points.len();
debug!("run: attempting to write {} points", num_points);
if num_points > max_points {
warn!(
"max submission rate of {} datapoints per second exceeded. only the
first {} of {} points will be submitted",
"Max submission rate of {} datapoints per second exceeded. Only the \
first {} of {} points will be submitted.",
max_points_per_sec, max_points, num_points
);
}
let points_written = cmp::min(num_points, max_points - 1);
points.truncate(points_written);
points.push(
let mut combined = std::mem::take(points);
combined.truncate(points_written);
combined.extend(counters.values().take(fit_counters).map(|v| v.into()));
counters.clear();
combined.push(
DataPoint::new("metrics")
.add_field_i64("points_written", points_written as i64)
.add_field_i64("num_points", num_points as i64)
.add_field_i64("points_lost", (num_points - points_written) as i64)
.add_field_i64("points_buffered", points_buffered as i64)
.add_field_i64(
"secs_since_last_write",
now.duration_since(last_write_time).as_secs() as i64,
)
.add_field_i64("secs_since_last_write", secs_since_last_write as i64)
.to_owned(),
);
writer.write(points);
combined
}
// Consumes provided `points`, sending up to `max_points` of them into the `writer`.
//
// Returns an updated value for `last_write_time`. Which is equal to `Instant::now()`, just
// before `write` in updated.
fn write(
writer: &Arc<dyn MetricsWriter + Send + Sync>,
max_points: usize,
max_points_per_sec: usize,
last_write_time: Instant,
points_buffered: usize,
points: &mut Vec<DataPoint>,
counters: &mut CounterMap,
) -> Instant {
let now = Instant::now();
let secs_since_last_write = now.duration_since(last_write_time).as_secs();
writer.write(Self::combine_points(
max_points,
max_points_per_sec,
secs_since_last_write,
points_buffered,
points,
counters,
));
now
}
fn run(
@ -263,20 +296,28 @@ impl MetricsAgent {
let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
// Bind common arguments in the `Self::write()` call.
let write = |last_write_time: Instant,
points: &mut Vec<DataPoint>,
counters: &mut CounterMap|
-> Instant {
Self::write(
writer,
max_points,
max_points_per_sec,
last_write_time,
receiver.len(),
points,
counters,
)
};
loop {
match receiver.recv_timeout(write_frequency / 2) {
Ok(cmd) => match cmd {
MetricsCommand::Flush(barrier) => {
debug!("metrics_thread: flush");
Self::write(
writer,
Self::collect_points(&mut points, &mut counters),
max_points,
max_points_per_sec,
last_write_time,
receiver.len(),
);
last_write_time = Instant::now();
last_write_time = write(last_write_time, &mut points, &mut counters);
barrier.wait();
}
MetricsCommand::Submit(point, level) => {
@ -293,9 +334,7 @@ impl MetricsAgent {
}
}
},
Err(RecvTimeoutError::Timeout) => {
trace!("run: receive timeout");
}
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => {
debug!("run: sender disconnected");
break;
@ -304,17 +343,11 @@ impl MetricsAgent {
let now = Instant::now();
if now.duration_since(last_write_time) >= write_frequency {
Self::write(
writer,
Self::collect_points(&mut points, &mut counters),
max_points,
max_points_per_sec,
last_write_time,
receiver.len(),
);
last_write_time = now;
last_write_time = write(last_write_time, &mut points, &mut counters);
}
}
let _ = write(last_write_time, &mut points, &mut counters);
trace!("run: exit");
}
@ -628,12 +661,15 @@ mod test {
#[test]
fn test_submit_exceed_max_rate() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 100);
for i in 0..102 {
let max_points_per_sec = 100;
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec);
for i in 0..(max_points_per_sec + 20) {
agent.submit(
DataPoint::new("measurement")
.add_field_i64("i", i)
.add_field_i64("i", i.try_into().unwrap())
.to_owned(),
Level::Info,
);
@ -642,7 +678,11 @@ mod test {
thread::sleep(Duration::from_secs(2));
agent.flush();
assert_eq!(writer.points_written(), 100);
// We are expecting `max_points_per_sec - 1` data points from `submit()` and two more metric
// stats data points. One from the timeout when all the `submit()`ed values are sent when 1
// second is elapsed, and then one more from the explicit `flush()`.
assert_eq!(writer.points_written(), max_points_per_sec + 1);
}
#[test]