metric optimization: simply data structure used for metric datapoints and counter storage (#24447)
* remove redudant if * check loglevel enable before submit metrics * optimize metric data structure * add metrics benches * clippy: add default impl * remove clone * add random benches * use mem::swap to exchange points vec
This commit is contained in:
parent
db32549c00
commit
71ad121282
|
@ -25,5 +25,8 @@ serial_test = "0.6.0"
|
||||||
[lib]
|
[lib]
|
||||||
name = "solana_metrics"
|
name = "solana_metrics"
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "metrics"
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
targets = ["x86_64-unknown-linux-gnu"]
|
targets = ["x86_64-unknown-linux-gnu"]
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
#![feature(test)]
|
||||||
|
|
||||||
|
extern crate test;
|
||||||
|
|
||||||
|
use {
|
||||||
|
log::*,
|
||||||
|
rand::distributions::{Distribution, Uniform},
|
||||||
|
solana_metrics::{
|
||||||
|
counter::CounterPoint,
|
||||||
|
datapoint::DataPoint,
|
||||||
|
metrics::{test_mocks::MockMetricsWriter, MetricsAgent},
|
||||||
|
},
|
||||||
|
std::{sync::Arc, time::Duration},
|
||||||
|
test::Bencher,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_datapoint_submission(bencher: &mut Bencher) {
|
||||||
|
let writer = Arc::new(MockMetricsWriter::new());
|
||||||
|
let agent = MetricsAgent::new(writer, Duration::from_secs(10), 1000);
|
||||||
|
|
||||||
|
bencher.iter(|| {
|
||||||
|
for i in 0..1000 {
|
||||||
|
agent.submit(
|
||||||
|
DataPoint::new("measurement")
|
||||||
|
.add_field_i64("i", i)
|
||||||
|
.to_owned(),
|
||||||
|
Level::Info,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
agent.flush();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_counter_submission(bencher: &mut Bencher) {
|
||||||
|
let writer = Arc::new(MockMetricsWriter::new());
|
||||||
|
let agent = MetricsAgent::new(writer, Duration::from_secs(10), 1000);
|
||||||
|
|
||||||
|
bencher.iter(|| {
|
||||||
|
for i in 0..1000 {
|
||||||
|
agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
|
||||||
|
}
|
||||||
|
agent.flush();
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_random_submission(bencher: &mut Bencher) {
|
||||||
|
let writer = Arc::new(MockMetricsWriter::new());
|
||||||
|
let agent = MetricsAgent::new(writer, Duration::from_secs(10), 1000);
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let die = Uniform::<i32>::from(1..7);
|
||||||
|
|
||||||
|
bencher.iter(|| {
|
||||||
|
for i in 0..1000 {
|
||||||
|
let dice = die.sample(&mut rng);
|
||||||
|
|
||||||
|
if dice == 6 {
|
||||||
|
agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
|
||||||
|
} else {
|
||||||
|
agent.submit(
|
||||||
|
DataPoint::new("measurement")
|
||||||
|
.add_field_i64("i", i as i64)
|
||||||
|
.to_owned(),
|
||||||
|
Level::Info,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
agent.flush();
|
||||||
|
})
|
||||||
|
}
|
|
@ -32,7 +32,6 @@ pub struct CounterPoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CounterPoint {
|
impl CounterPoint {
|
||||||
#[cfg(test)]
|
|
||||||
pub fn new(name: &'static str) -> Self {
|
pub fn new(name: &'static str) -> Self {
|
||||||
CounterPoint {
|
CounterPoint {
|
||||||
name,
|
name,
|
||||||
|
@ -59,11 +58,7 @@ macro_rules! create_counter {
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! inc_counter {
|
macro_rules! inc_counter {
|
||||||
($name:expr, $level:expr, $count:expr) => {
|
($name:expr, $level:expr, $count:expr) => {
|
||||||
unsafe {
|
unsafe { $name.inc($level, $count) };
|
||||||
if log_enabled!($level) {
|
|
||||||
$name.inc($level, $count)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
#![allow(clippy::integer_arithmetic)]
|
#![allow(clippy::integer_arithmetic)]
|
||||||
pub mod counter;
|
pub mod counter;
|
||||||
pub mod datapoint;
|
pub mod datapoint;
|
||||||
mod metrics;
|
pub mod metrics;
|
||||||
pub mod poh_timing_point;
|
pub mod poh_timing_point;
|
||||||
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
|
|
|
@ -36,11 +36,11 @@ enum MetricsCommand {
|
||||||
SubmitCounter(CounterPoint, log::Level, u64),
|
SubmitCounter(CounterPoint, log::Level, u64),
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MetricsAgent {
|
pub struct MetricsAgent {
|
||||||
sender: Sender<MetricsCommand>,
|
sender: Sender<MetricsCommand>,
|
||||||
}
|
}
|
||||||
|
|
||||||
trait MetricsWriter {
|
pub trait MetricsWriter {
|
||||||
// Write the points and empty the vector. Called on the internal
|
// Write the points and empty the vector. Called on the internal
|
||||||
// MetricsAgent worker thread.
|
// MetricsAgent worker thread.
|
||||||
fn write(&self, points: Vec<DataPoint>);
|
fn write(&self, points: Vec<DataPoint>);
|
||||||
|
@ -148,7 +148,7 @@ impl Default for MetricsAgent {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetricsAgent {
|
impl MetricsAgent {
|
||||||
fn new(
|
pub fn new(
|
||||||
writer: Arc<dyn MetricsWriter + Send + Sync>,
|
writer: Arc<dyn MetricsWriter + Send + Sync>,
|
||||||
write_frequency: Duration,
|
write_frequency: Duration,
|
||||||
max_points_per_sec: usize,
|
max_points_per_sec: usize,
|
||||||
|
@ -159,25 +159,13 @@ impl MetricsAgent {
|
||||||
Self { sender }
|
Self { sender }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn collect_points(
|
fn collect_points(points: &mut Vec<DataPoint>, counters: &mut CounterMap) -> Vec<DataPoint> {
|
||||||
points_map: &mut HashMap<log::Level, (CounterMap, Vec<DataPoint>)>,
|
let mut ret: Vec<DataPoint> = Vec::default();
|
||||||
) -> Vec<DataPoint> {
|
std::mem::swap(&mut ret, points);
|
||||||
let points: Vec<DataPoint> = [
|
for (_, v) in counters.drain() {
|
||||||
Level::Error,
|
ret.push(v.into());
|
||||||
Level::Warn,
|
}
|
||||||
Level::Info,
|
ret
|
||||||
Level::Debug,
|
|
||||||
Level::Trace,
|
|
||||||
]
|
|
||||||
.iter()
|
|
||||||
.filter_map(|level| points_map.remove(level))
|
|
||||||
.flat_map(|(counters, points)| {
|
|
||||||
let counter_points = counters.into_iter().map(|(_, v)| v.into());
|
|
||||||
points.into_iter().chain(counter_points)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
points_map.clear();
|
|
||||||
points
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(
|
fn write(
|
||||||
|
@ -219,6 +207,7 @@ impl MetricsAgent {
|
||||||
|
|
||||||
writer.write(points);
|
writer.write(points);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run(
|
fn run(
|
||||||
receiver: &Receiver<MetricsCommand>,
|
receiver: &Receiver<MetricsCommand>,
|
||||||
writer: &Arc<dyn MetricsWriter + Send + Sync>,
|
writer: &Arc<dyn MetricsWriter + Send + Sync>,
|
||||||
|
@ -227,7 +216,9 @@ impl MetricsAgent {
|
||||||
) {
|
) {
|
||||||
trace!("run: enter");
|
trace!("run: enter");
|
||||||
let mut last_write_time = Instant::now();
|
let mut last_write_time = Instant::now();
|
||||||
let mut points_map = HashMap::<log::Level, (CounterMap, Vec<DataPoint>)>::new();
|
let mut points = Vec::<DataPoint>::new();
|
||||||
|
let mut counters = CounterMap::new();
|
||||||
|
|
||||||
let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
|
let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -237,7 +228,7 @@ impl MetricsAgent {
|
||||||
debug!("metrics_thread: flush");
|
debug!("metrics_thread: flush");
|
||||||
Self::write(
|
Self::write(
|
||||||
writer,
|
writer,
|
||||||
Self::collect_points(&mut points_map),
|
Self::collect_points(&mut points, &mut counters),
|
||||||
max_points,
|
max_points,
|
||||||
max_points_per_sec,
|
max_points_per_sec,
|
||||||
last_write_time,
|
last_write_time,
|
||||||
|
@ -248,17 +239,10 @@ impl MetricsAgent {
|
||||||
}
|
}
|
||||||
MetricsCommand::Submit(point, level) => {
|
MetricsCommand::Submit(point, level) => {
|
||||||
log!(level, "{}", point);
|
log!(level, "{}", point);
|
||||||
let (_, points) = points_map
|
|
||||||
.entry(level)
|
|
||||||
.or_insert((HashMap::new(), Vec::new()));
|
|
||||||
points.push(point);
|
points.push(point);
|
||||||
}
|
}
|
||||||
MetricsCommand::SubmitCounter(counter, level, bucket) => {
|
MetricsCommand::SubmitCounter(counter, _level, bucket) => {
|
||||||
debug!("{:?}", counter);
|
debug!("{:?}", counter);
|
||||||
let (counters, _) = points_map
|
|
||||||
.entry(level)
|
|
||||||
.or_insert((HashMap::new(), Vec::new()));
|
|
||||||
|
|
||||||
let key = (counter.name, bucket);
|
let key = (counter.name, bucket);
|
||||||
if let Some(value) = counters.get_mut(&key) {
|
if let Some(value) = counters.get_mut(&key) {
|
||||||
value.count += counter.count;
|
value.count += counter.count;
|
||||||
|
@ -280,7 +264,7 @@ impl MetricsAgent {
|
||||||
if now.duration_since(last_write_time) >= write_frequency {
|
if now.duration_since(last_write_time) >= write_frequency {
|
||||||
Self::write(
|
Self::write(
|
||||||
writer,
|
writer,
|
||||||
Self::collect_points(&mut points_map),
|
Self::collect_points(&mut points, &mut counters),
|
||||||
max_points,
|
max_points,
|
||||||
max_points_per_sec,
|
max_points_per_sec,
|
||||||
last_write_time,
|
last_write_time,
|
||||||
|
@ -465,25 +449,31 @@ pub fn set_panic_hook(program: &'static str, version: Option<String>) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
pub mod test_mocks {
|
||||||
mod test {
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
struct MockMetricsWriter {
|
pub struct MockMetricsWriter {
|
||||||
points_written: Arc<Mutex<Vec<DataPoint>>>,
|
pub points_written: Arc<Mutex<Vec<DataPoint>>>,
|
||||||
}
|
}
|
||||||
impl MockMetricsWriter {
|
impl MockMetricsWriter {
|
||||||
fn new() -> Self {
|
#[allow(dead_code)]
|
||||||
|
pub fn new() -> Self {
|
||||||
MockMetricsWriter {
|
MockMetricsWriter {
|
||||||
points_written: Arc::new(Mutex::new(Vec::new())),
|
points_written: Arc::new(Mutex::new(Vec::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn points_written(&self) -> usize {
|
pub fn points_written(&self) -> usize {
|
||||||
self.points_written.lock().unwrap().len()
|
self.points_written.lock().unwrap().len()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for MockMetricsWriter {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl MetricsWriter for MockMetricsWriter {
|
impl MetricsWriter for MockMetricsWriter {
|
||||||
fn write(&self, points: Vec<DataPoint>) {
|
fn write(&self, points: Vec<DataPoint>) {
|
||||||
assert!(!points.is_empty());
|
assert!(!points.is_empty());
|
||||||
|
@ -501,6 +491,11 @@ mod test {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use {super::*, test_mocks::MockMetricsWriter};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_submit() {
|
fn test_submit() {
|
||||||
|
|
Loading…
Reference in New Issue