From f3b760dd916a87812659424a19e83981f80a13eb Mon Sep 17 00:00:00 2001 From: Brennan Watt Date: Tue, 2 Aug 2022 14:29:53 -0700 Subject: [PATCH] Add IO metrics (#26804) * Add Disk IO metrics --- core/src/system_monitor_service.rs | 255 +++++++++++++++++++++++++ core/src/validator.rs | 3 + ledger-tool/src/main.rs | 1 + local-cluster/src/validator_configs.rs | 1 + validator/src/main.rs | 6 + 5 files changed, 266 insertions(+) diff --git a/core/src/system_monitor_service.rs b/core/src/system_monitor_service.rs index e001a141dc..e1e75e521a 100644 --- a/core/src/system_monitor_service.rs +++ b/core/src/system_monitor_service.rs @@ -22,12 +22,15 @@ const SAMPLE_INTERVAL_UDP_MS: u64 = 2 * MS_PER_S; const SAMPLE_INTERVAL_OS_NETWORK_LIMITS_MS: u64 = MS_PER_H; const SAMPLE_INTERVAL_MEM_MS: u64 = MS_PER_S; const SAMPLE_INTERVAL_CPU_MS: u64 = MS_PER_S; +const SAMPLE_INTERVAL_DISK_MS: u64 = MS_PER_S; const SLEEP_INTERVAL: Duration = Duration::from_millis(500); #[cfg(target_os = "linux")] const PROC_NET_SNMP_PATH: &str = "/proc/net/snmp"; #[cfg(target_os = "linux")] const PROC_NET_DEV_PATH: &str = "/proc/net/dev"; +#[cfg(target_os = "linux")] +const PROC_DISKSTATS_PATH: &str = "/proc/diskstats"; pub struct SystemMonitorService { thread_hdl: JoinHandle<()>, @@ -96,6 +99,32 @@ struct CpuInfo { num_threads: u64, } +#[derive(Default)] +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +// These stats are aggregated across all storage devices excluding internal loopbacks. +// Fields are cumulative since boot with the exception of 'num_disks' and 'io_in_progress' +struct DiskStats { + reads_completed: u64, + reads_merged: u64, + sectors_read: u64, + time_reading_ms: u64, + writes_completed: u64, + writes_merged: u64, + sectors_written: u64, + time_writing_ms: u64, + io_in_progress: u64, + time_io_ms: u64, + // weighted time multiplies time performing IO by number of commands in the queue + time_io_weighted_ms: u64, + discards_completed: u64, + discards_merged: u64, + sectors_discarded: u64, + time_discarding: u64, + flushes_completed: u64, + time_flushing: u64, + num_disks: u64, +} + impl UdpStats { fn from_map(udp_stats: &HashMap) -> Self { Self { @@ -223,12 +252,61 @@ pub fn verify_net_stats_access() -> Result<(), String> { Ok(()) } +#[cfg(target_os = "linux")] +fn read_disk_stats() -> Result { + let file_path_diskstats = PROC_DISKSTATS_PATH; + let file_diskstats = File::open(file_path_diskstats).map_err(|e| e.to_string())?; + let mut reader_diskstats = BufReader::new(file_diskstats); + parse_disk_stats(&mut reader_diskstats) +} + +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +fn parse_disk_stats(reader_diskstats: &mut impl BufRead) -> Result { + let mut stats = DiskStats::default(); + let mut num_disks = 0; + for line in reader_diskstats.lines() { + let line = line.map_err(|e| e.to_string())?; + let values: Vec<_> = line.split_ascii_whitespace().collect(); + + if values.len() != 20 { + return Err("parse error, expected exactly 20 disk stat elements".to_string()); + } + if values[2].starts_with("loop") || values[1].ne("0") { + // Filter out the loopback io devices. + // Only look at raw device (filter partitions) + continue; + } + + num_disks += 1; + stats.reads_completed += values[3].parse::().map_err(|e| e.to_string())?; + stats.reads_merged += values[4].parse::().map_err(|e| e.to_string())?; + stats.sectors_read += values[5].parse::().map_err(|e| e.to_string())?; + stats.time_reading_ms += values[6].parse::().map_err(|e| e.to_string())?; + stats.writes_completed += values[7].parse::().map_err(|e| e.to_string())?; + stats.writes_merged += values[8].parse::().map_err(|e| e.to_string())?; + stats.sectors_written += values[9].parse::().map_err(|e| e.to_string())?; + stats.time_writing_ms += values[10].parse::().map_err(|e| e.to_string())?; + stats.io_in_progress += values[11].parse::().map_err(|e| e.to_string())?; + stats.time_io_ms += values[12].parse::().map_err(|e| e.to_string())?; + stats.time_io_weighted_ms += values[13].parse::().map_err(|e| e.to_string())?; + stats.discards_completed += values[14].parse::().map_err(|e| e.to_string())?; + stats.discards_merged += values[15].parse::().map_err(|e| e.to_string())?; + stats.sectors_discarded += values[16].parse::().map_err(|e| e.to_string())?; + stats.time_discarding += values[17].parse::().map_err(|e| e.to_string())?; + stats.flushes_completed += values[18].parse::().map_err(|e| e.to_string())?; + stats.time_flushing += values[19].parse::().map_err(|e| e.to_string())?; + } + stats.num_disks = num_disks; + Ok(stats) +} + impl SystemMonitorService { pub fn new( exit: Arc, report_os_memory_stats: bool, report_os_network_stats: bool, report_os_cpu_stats: bool, + report_os_disk_stats: bool, ) -> Self { info!("Starting SystemMonitorService"); let thread_hdl = Builder::new() @@ -239,6 +317,7 @@ impl SystemMonitorService { report_os_memory_stats, report_os_network_stats, report_os_cpu_stats, + report_os_disk_stats, ); }) .unwrap(); @@ -572,17 +651,155 @@ impl SystemMonitorService { } } + #[cfg(target_os = "linux")] + fn process_disk_stats(disk_stats: &mut Option) { + match read_disk_stats() { + Ok(new_stats) => { + if let Some(old_stats) = disk_stats { + Self::report_disk_stats(old_stats, &new_stats); + } + *disk_stats = Some(new_stats); + } + Err(e) => warn!("read_disk_stats: {}", e), + } + } + + #[cfg(not(target_os = "linux"))] + fn process_disk_stats(_disk_stats: &mut Option) {} + + #[cfg(target_os = "linux")] + fn report_disk_stats(old_stats: &DiskStats, new_stats: &DiskStats) { + datapoint_info!( + "disk-stats", + ( + "reads_completed", + new_stats + .reads_completed + .saturating_sub(old_stats.reads_completed), + i64 + ), + ( + "reads_merged", + new_stats + .reads_merged + .saturating_sub(old_stats.reads_merged), + i64 + ), + ( + "sectors_read", + new_stats + .sectors_read + .saturating_sub(old_stats.sectors_read), + i64 + ), + ( + "time_reading_ms", + new_stats + .time_reading_ms + .saturating_sub(old_stats.time_reading_ms), + i64 + ), + ( + "writes_completed", + new_stats + .writes_completed + .saturating_sub(old_stats.writes_completed), + i64 + ), + ( + "writes_merged", + new_stats + .writes_merged + .saturating_sub(old_stats.writes_merged), + i64 + ), + ( + "sectors_written", + new_stats + .sectors_written + .saturating_sub(old_stats.sectors_written), + i64 + ), + ( + "time_writing_ms", + new_stats + .time_writing_ms + .saturating_sub(old_stats.time_writing_ms), + i64 + ), + ("io_in_progress", new_stats.io_in_progress, i64), + ( + "time_io_ms", + new_stats.time_io_ms.saturating_sub(old_stats.time_io_ms), + i64 + ), + ( + "time_io_weighted_ms", + new_stats + .time_io_weighted_ms + .saturating_sub(old_stats.time_io_weighted_ms), + i64 + ), + ( + "discards_completed", + new_stats + .discards_completed + .saturating_sub(old_stats.discards_completed), + i64 + ), + ( + "discards_merged", + new_stats + .discards_merged + .saturating_sub(old_stats.discards_merged), + i64 + ), + ( + "sectors_discarded", + new_stats + .sectors_discarded + .saturating_sub(old_stats.sectors_discarded), + i64 + ), + ( + "time_discarding", + new_stats + .time_discarding + .saturating_sub(old_stats.time_discarding), + i64 + ), + ( + "flushes_completed", + new_stats + .flushes_completed + .saturating_sub(old_stats.flushes_completed), + i64 + ), + ( + "time_flushing", + new_stats + .time_flushing + .saturating_sub(old_stats.time_flushing), + i64 + ), + ("num_disks", new_stats.num_disks, i64), + ) + } + pub fn run( exit: Arc, report_os_memory_stats: bool, report_os_network_stats: bool, report_os_cpu_stats: bool, + report_os_disk_stats: bool, ) { let mut udp_stats = None; + let mut disk_stats = None; let network_limits_timer = AtomicInterval::default(); let udp_timer = AtomicInterval::default(); let mem_timer = AtomicInterval::default(); let cpu_timer = AtomicInterval::default(); + let disk_timer = AtomicInterval::default(); loop { if exit.load(Ordering::Relaxed) { @@ -602,6 +819,9 @@ impl SystemMonitorService { if report_os_cpu_stats && cpu_timer.should_update(SAMPLE_INTERVAL_CPU_MS) { Self::report_cpu_stats(); } + if report_os_disk_stats && disk_timer.should_update(SAMPLE_INTERVAL_DISK_MS) { + Self::process_disk_stats(&mut disk_stats); + } sleep(SLEEP_INTERVAL); } } @@ -670,6 +890,41 @@ data" as &[u8]; assert!(stats.is_err()); } + #[test] + fn test_parse_disk_stats() { + const MOCK_DISK: &[u8] = +b" 7 0 loop0 108 0 2906 27 0 0 0 0 0 40 27 0 0 0 0 0 0 +7 1 loop1 48 0 716 23 0 0 0 0 0 28 23 0 0 0 0 0 0 +7 2 loop2 108 0 2916 21 0 0 0 0 0 36 21 0 0 0 0 0 0 +7 3 loop3 257 0 4394 131 0 0 0 0 0 296 131 0 0 0 0 0 0 +7 4 loop4 111 0 2896 62 0 0 0 0 0 68 62 0 0 0 0 0 0 +7 5 loop5 110 0 2914 138 0 0 0 0 0 112 138 0 0 0 0 0 0 +7 6 loop6 68 0 2200 47 0 0 0 0 0 44 47 0 0 0 0 0 0 +7 7 loop7 1397 0 101686 515 0 0 0 0 0 4628 515 0 0 0 0 0 0 +8 0 sda 40883273 294820 1408426268 30522643 352908152 204249001 37827695922 2073754124 0 86054536 2105005805 496399 4 1886486166 167545 18008621 561492 +8 1 sda1 40882879 291543 1408408989 30522451 352908150 204249001 37827695920 2073754122 0 86054508 2104444115 496393 0 1886085576 167541 0 0 +8 14 sda14 73 0 832 22 0 0 0 0 0 48 22 0 0 0 0 0 0 +8 15 sda15 146 3277 9855 62 2 0 2 1 0 76 68 6 4 400590 3 0 0 +7 9 loop9 55 0 2106 41 0 0 0 0 0 28 41 0 0 0 0 0 0 +7 8 loop8 41 0 688 53 0 0 0 0 0 44 53 0 0 0 0 0 0 +7 10 loop10 60 0 748 1 0 0 0 0 0 20 1 0 0 0 0 0 0 +9 0 sdb 1 1 1 1 352908152 204249001 37827695922 2073754124 0 86054536 2105005805 496399 4 1886486166 167545 18008621 561492" as &[u8]; + const UNEXPECTED_DATA: &[u8] = b"un +ex +pec +ted +data" as &[u8]; + + let mut mock_disk = MOCK_DISK; + let stats = parse_disk_stats(&mut mock_disk).unwrap(); + assert_eq!(stats.reads_completed, 40883274); + assert_eq!(stats.time_flushing, 1122984); + + let mut mock_disk = UNEXPECTED_DATA; + let stats = parse_disk_stats(&mut mock_disk); + assert!(stats.is_err()); + } + #[test] fn test_calc_percent() { assert!(SystemMonitorService::calc_percent(99, 100) < 100.0); diff --git a/core/src/validator.rs b/core/src/validator.rs index 240d0ade2a..0e91be7c71 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -161,6 +161,7 @@ pub struct ValidatorConfig { pub no_os_memory_stats_reporting: bool, pub no_os_network_stats_reporting: bool, pub no_os_cpu_stats_reporting: bool, + pub no_os_disk_stats_reporting: bool, pub poh_pinned_cpu_core: usize, pub poh_hashes_per_batch: u64, pub account_indexes: AccountSecondaryIndexes, @@ -223,6 +224,7 @@ impl Default for ValidatorConfig { no_os_memory_stats_reporting: true, no_os_network_stats_reporting: true, no_os_cpu_stats_reporting: true, + no_os_disk_stats_reporting: true, poh_pinned_cpu_core: poh_service::DEFAULT_PINNED_CPU_CORE, poh_hashes_per_batch: poh_service::DEFAULT_HASHES_PER_BATCH, account_indexes: AccountSecondaryIndexes::default(), @@ -504,6 +506,7 @@ impl Validator { !config.no_os_memory_stats_reporting, !config.no_os_network_stats_reporting, !config.no_os_cpu_stats_reporting, + !config.no_os_disk_stats_reporting, )); let (poh_timing_point_sender, poh_timing_point_receiver) = unbounded(); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index a91e9888d6..6f29200a41 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -2397,6 +2397,7 @@ fn main() { !no_os_memory_stats_reporting, false, false, + false, ); accounts_index_config.index_limit_mb = if let Some(limit) = diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index cf4d8d7754..e717b46815 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -48,6 +48,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { no_os_memory_stats_reporting: config.no_os_memory_stats_reporting, no_os_network_stats_reporting: config.no_os_network_stats_reporting, no_os_cpu_stats_reporting: config.no_os_cpu_stats_reporting, + no_os_disk_stats_reporting: config.no_os_disk_stats_reporting, poh_pinned_cpu_core: config.poh_pinned_cpu_core, account_indexes: config.account_indexes.clone(), accounts_db_caching_enabled: config.accounts_db_caching_enabled, diff --git a/validator/src/main.rs b/validator/src/main.rs index 1e99b79234..ce8c12c06f 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -971,6 +971,11 @@ pub fn main() { .long("no-os-cpu-stats-reporting") .help("Disable reporting of OS CPU statistics.") ) + .arg( + Arg::with_name("no_os_disk_stats_reporting") + .long("no-os-disk-stats-reporting") + .help("Disable reporting of OS disk statistics.") + ) .arg( Arg::with_name("accounts-hash-interval-slots") .long("accounts-hash-interval-slots") @@ -2641,6 +2646,7 @@ pub fn main() { no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"), no_os_network_stats_reporting: matches.is_present("no_os_network_stats_reporting"), no_os_cpu_stats_reporting: matches.is_present("no_os_cpu_stats_reporting"), + no_os_disk_stats_reporting: matches.is_present("no_os_disk_stats_reporting"), poh_pinned_cpu_core: value_of(&matches, "poh_pinned_cpu_core") .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE), poh_hashes_per_batch: value_of(&matches, "poh_hashes_per_batch")