Add IO metrics (#26804)

* Add Disk IO metrics
This commit is contained in:
Brennan Watt 2022-08-02 14:29:53 -07:00 committed by GitHub
parent 224550d65f
commit f3b760dd91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 266 additions and 0 deletions

View File

@ -22,12 +22,15 @@ const SAMPLE_INTERVAL_UDP_MS: u64 = 2 * MS_PER_S;
const SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS: u64 = MS_PER_H;
const SAMPLE_INTERVAL_MEM_MS: u64 = MS_PER_S;
const SAMPLE_INTERVAL_CPU_MS: u64 = MS_PER_S;
const SAMPLE_INTERVAL_DISK_MS: u64 = MS_PER_S;
const SLEEP_INTERVAL: Duration = Duration::from_millis(500);
#[cfg(target_os = "linux")]
const PROC_NET_SNMP_PATH: &str = "/proc/net/snmp";
#[cfg(target_os = "linux")]
const PROC_NET_DEV_PATH: &str = "/proc/net/dev";
#[cfg(target_os = "linux")]
const PROC_DISKSTATS_PATH: &str = "/proc/diskstats";
pub struct SystemMonitorService {
thread_hdl: JoinHandle<()>,
@ -96,6 +99,32 @@ struct CpuInfo {
num_threads: u64,
}
#[derive(Default)]
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
// These stats are aggregated across all storage devices excluding internal loopbacks.
// Fields are cumulative since boot with the exception of 'num_disks' and 'io_in_progress'
struct DiskStats {
reads_completed: u64,
reads_merged: u64,
sectors_read: u64,
time_reading_ms: u64,
writes_completed: u64,
writes_merged: u64,
sectors_written: u64,
time_writing_ms: u64,
io_in_progress: u64,
time_io_ms: u64,
// weighted time multiplies time performing IO by number of commands in the queue
time_io_weighted_ms: u64,
discards_completed: u64,
discards_merged: u64,
sectors_discarded: u64,
time_discarding: u64,
flushes_completed: u64,
time_flushing: u64,
num_disks: u64,
}
impl UdpStats {
fn from_map(udp_stats: &HashMap<String, u64>) -> Self {
Self {
@ -223,12 +252,61 @@ pub fn verify_net_stats_access() -> Result<(), String> {
Ok(())
}
#[cfg(target_os = "linux")]
fn read_disk_stats() -> Result<DiskStats, String> {
let file_path_diskstats = PROC_DISKSTATS_PATH;
let file_diskstats = File::open(file_path_diskstats).map_err(|e| e.to_string())?;
let mut reader_diskstats = BufReader::new(file_diskstats);
parse_disk_stats(&mut reader_diskstats)
}
#[cfg_attr(not(target_os = "linux"), allow(dead_code))]
fn parse_disk_stats(reader_diskstats: &mut impl BufRead) -> Result<DiskStats, String> {
let mut stats = DiskStats::default();
let mut num_disks = 0;
for line in reader_diskstats.lines() {
let line = line.map_err(|e| e.to_string())?;
let values: Vec<_> = line.split_ascii_whitespace().collect();
if values.len() != 20 {
return Err("parse error, expected exactly 20 disk stat elements".to_string());
}
if values[2].starts_with("loop") || values[1].ne("0") {
// Filter out the loopback io devices.
// Only look at raw device (filter partitions)
continue;
}
num_disks += 1;
stats.reads_completed += values[3].parse::<u64>().map_err(|e| e.to_string())?;
stats.reads_merged += values[4].parse::<u64>().map_err(|e| e.to_string())?;
stats.sectors_read += values[5].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_reading_ms += values[6].parse::<u64>().map_err(|e| e.to_string())?;
stats.writes_completed += values[7].parse::<u64>().map_err(|e| e.to_string())?;
stats.writes_merged += values[8].parse::<u64>().map_err(|e| e.to_string())?;
stats.sectors_written += values[9].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_writing_ms += values[10].parse::<u64>().map_err(|e| e.to_string())?;
stats.io_in_progress += values[11].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_io_ms += values[12].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_io_weighted_ms += values[13].parse::<u64>().map_err(|e| e.to_string())?;
stats.discards_completed += values[14].parse::<u64>().map_err(|e| e.to_string())?;
stats.discards_merged += values[15].parse::<u64>().map_err(|e| e.to_string())?;
stats.sectors_discarded += values[16].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_discarding += values[17].parse::<u64>().map_err(|e| e.to_string())?;
stats.flushes_completed += values[18].parse::<u64>().map_err(|e| e.to_string())?;
stats.time_flushing += values[19].parse::<u64>().map_err(|e| e.to_string())?;
}
stats.num_disks = num_disks;
Ok(stats)
}
impl SystemMonitorService {
pub fn new(
exit: Arc<AtomicBool>,
report_os_memory_stats: bool,
report_os_network_stats: bool,
report_os_cpu_stats: bool,
report_os_disk_stats: bool,
) -> Self {
info!("Starting SystemMonitorService");
let thread_hdl = Builder::new()
@ -239,6 +317,7 @@ impl SystemMonitorService {
report_os_memory_stats,
report_os_network_stats,
report_os_cpu_stats,
report_os_disk_stats,
);
})
.unwrap();
@ -572,17 +651,155 @@ impl SystemMonitorService {
}
}
#[cfg(target_os = "linux")]
fn process_disk_stats(disk_stats: &mut Option<DiskStats>) {
match read_disk_stats() {
Ok(new_stats) => {
if let Some(old_stats) = disk_stats {
Self::report_disk_stats(old_stats, &new_stats);
}
*disk_stats = Some(new_stats);
}
Err(e) => warn!("read_disk_stats: {}", e),
}
}
#[cfg(not(target_os = "linux"))]
fn process_disk_stats(_disk_stats: &mut Option<DiskStats>) {}
#[cfg(target_os = "linux")]
fn report_disk_stats(old_stats: &DiskStats, new_stats: &DiskStats) {
datapoint_info!(
"disk-stats",
(
"reads_completed",
new_stats
.reads_completed
.saturating_sub(old_stats.reads_completed),
i64
),
(
"reads_merged",
new_stats
.reads_merged
.saturating_sub(old_stats.reads_merged),
i64
),
(
"sectors_read",
new_stats
.sectors_read
.saturating_sub(old_stats.sectors_read),
i64
),
(
"time_reading_ms",
new_stats
.time_reading_ms
.saturating_sub(old_stats.time_reading_ms),
i64
),
(
"writes_completed",
new_stats
.writes_completed
.saturating_sub(old_stats.writes_completed),
i64
),
(
"writes_merged",
new_stats
.writes_merged
.saturating_sub(old_stats.writes_merged),
i64
),
(
"sectors_written",
new_stats
.sectors_written
.saturating_sub(old_stats.sectors_written),
i64
),
(
"time_writing_ms",
new_stats
.time_writing_ms
.saturating_sub(old_stats.time_writing_ms),
i64
),
("io_in_progress", new_stats.io_in_progress, i64),
(
"time_io_ms",
new_stats.time_io_ms.saturating_sub(old_stats.time_io_ms),
i64
),
(
"time_io_weighted_ms",
new_stats
.time_io_weighted_ms
.saturating_sub(old_stats.time_io_weighted_ms),
i64
),
(
"discards_completed",
new_stats
.discards_completed
.saturating_sub(old_stats.discards_completed),
i64
),
(
"discards_merged",
new_stats
.discards_merged
.saturating_sub(old_stats.discards_merged),
i64
),
(
"sectors_discarded",
new_stats
.sectors_discarded
.saturating_sub(old_stats.sectors_discarded),
i64
),
(
"time_discarding",
new_stats
.time_discarding
.saturating_sub(old_stats.time_discarding),
i64
),
(
"flushes_completed",
new_stats
.flushes_completed
.saturating_sub(old_stats.flushes_completed),
i64
),
(
"time_flushing",
new_stats
.time_flushing
.saturating_sub(old_stats.time_flushing),
i64
),
("num_disks", new_stats.num_disks, i64),
)
}
pub fn run(
exit: Arc<AtomicBool>,
report_os_memory_stats: bool,
report_os_network_stats: bool,
report_os_cpu_stats: bool,
report_os_disk_stats: bool,
) {
let mut udp_stats = None;
let mut disk_stats = None;
let network_limits_timer = AtomicInterval::default();
let udp_timer = AtomicInterval::default();
let mem_timer = AtomicInterval::default();
let cpu_timer = AtomicInterval::default();
let disk_timer = AtomicInterval::default();
loop {
if exit.load(Ordering::Relaxed) {
@ -602,6 +819,9 @@ impl SystemMonitorService {
if report_os_cpu_stats && cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) {
Self::report_cpu_stats();
}
if report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) {
Self::process_disk_stats(&mut disk_stats);
}
sleep(SLEEP_INTERVAL);
}
}
@ -670,6 +890,41 @@ data" as &[u8];
assert!(stats.is_err());
}
#[test]
fn test_parse_disk_stats() {
const MOCK_DISK: &[u8] =
b" 7 0 loop0 108 0 2906 27 0 0 0 0 0 40 27 0 0 0 0 0 0
7 1 loop1 48 0 716 23 0 0 0 0 0 28 23 0 0 0 0 0 0
7 2 loop2 108 0 2916 21 0 0 0 0 0 36 21 0 0 0 0 0 0
7 3 loop3 257 0 4394 131 0 0 0 0 0 296 131 0 0 0 0 0 0
7 4 loop4 111 0 2896 62 0 0 0 0 0 68 62 0 0 0 0 0 0
7 5 loop5 110 0 2914 138 0 0 0 0 0 112 138 0 0 0 0 0 0
7 6 loop6 68 0 2200 47 0 0 0 0 0 44 47 0 0 0 0 0 0
7 7 loop7 1397 0 101686 515 0 0 0 0 0 4628 515 0 0 0 0 0 0
8 0 sda 40883273 294820 1408426268 30522643 352908152 204249001 37827695922 2073754124 0 86054536 2105005805 496399 4 1886486166 167545 18008621 561492
8 1 sda1 40882879 291543 1408408989 30522451 352908150 204249001 37827695920 2073754122 0 86054508 2104444115 496393 0 1886085576 167541 0 0
8 14 sda14 73 0 832 22 0 0 0 0 0 48 22 0 0 0 0 0 0
8 15 sda15 146 3277 9855 62 2 0 2 1 0 76 68 6 4 400590 3 0 0
7 9 loop9 55 0 2106 41 0 0 0 0 0 28 41 0 0 0 0 0 0
7 8 loop8 41 0 688 53 0 0 0 0 0 44 53 0 0 0 0 0 0
7 10 loop10 60 0 748 1 0 0 0 0 0 20 1 0 0 0 0 0 0
9 0 sdb 1 1 1 1 352908152 204249001 37827695922 2073754124 0 86054536 2105005805 496399 4 1886486166 167545 18008621 561492" as &[u8];
const UNEXPECTED_DATA: &[u8] = b"un
ex
pec
ted
data" as &[u8];
let mut mock_disk = MOCK_DISK;
let stats = parse_disk_stats(&mut mock_disk).unwrap();
assert_eq!(stats.reads_completed, 40883274);
assert_eq!(stats.time_flushing, 1122984);
let mut mock_disk = UNEXPECTED_DATA;
let stats = parse_disk_stats(&mut mock_disk);
assert!(stats.is_err());
}
#[test]
fn test_calc_percent() {
assert!(SystemMonitorService::calc_percent(99, 100) < 100.0);

View File

@ -161,6 +161,7 @@ pub struct ValidatorConfig {
pub no_os_memory_stats_reporting: bool,
pub no_os_network_stats_reporting: bool,
pub no_os_cpu_stats_reporting: bool,
pub no_os_disk_stats_reporting: bool,
pub poh_pinned_cpu_core: usize,
pub poh_hashes_per_batch: u64,
pub account_indexes: AccountSecondaryIndexes,
@ -223,6 +224,7 @@ impl Default for ValidatorConfig {
no_os_memory_stats_reporting: true,
no_os_network_stats_reporting: true,
no_os_cpu_stats_reporting: true,
no_os_disk_stats_reporting: true,
poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE,
poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH,
account_indexes: AccountSecondaryIndexes::default(),
@ -504,6 +506,7 @@ impl Validator {
!config.no_os_memory_stats_reporting,
!config.no_os_network_stats_reporting,
!config.no_os_cpu_stats_reporting,
!config.no_os_disk_stats_reporting,
));
let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded();

View File

@ -2397,6 +2397,7 @@ fn main() {
!no_os_memory_stats_reporting,
false,
false,
false,
);
accounts_index_config.index_limit_mb = if let Some(limit) =

View File

@ -48,6 +48,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
no_os_memory_stats_reporting: config.no_os_memory_stats_reporting,
no_os_network_stats_reporting: config.no_os_network_stats_reporting,
no_os_cpu_stats_reporting: config.no_os_cpu_stats_reporting,
no_os_disk_stats_reporting: config.no_os_disk_stats_reporting,
poh_pinned_cpu_core: config.poh_pinned_cpu_core,
account_indexes: config.account_indexes.clone(),
accounts_db_caching_enabled: config.accounts_db_caching_enabled,

View File

@ -971,6 +971,11 @@ pub fn main() {
.long("no-os-cpu-stats-reporting")
.help("Disable reporting of OS CPU statistics.")
)
.arg(
Arg::with_name("no_os_disk_stats_reporting")
.long("no-os-disk-stats-reporting")
.help("Disable reporting of OS disk statistics.")
)
.arg(
Arg::with_name("accounts-hash-interval-slots")
.long("accounts-hash-interval-slots")
@ -2641,6 +2646,7 @@ pub fn main() {
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"),
no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"),
no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"),
poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core")
.unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE),
poh_hashes_per_batch: value_of(&matches, "poh_hashes_per_batch")