diff --git a/core/src/lib.rs b/core/src/lib.rs index 99c351e204..6a758974cf 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -49,6 +49,7 @@ pub mod sigverify; pub mod sigverify_shreds; pub mod sigverify_stage; pub mod snapshot_packager_service; +pub mod system_monitor_service; pub mod test_validator; pub mod tower_storage; pub mod tpu; diff --git a/core/src/system_monitor_service.rs b/core/src/system_monitor_service.rs new file mode 100644 index 0000000000..d310aba208 --- /dev/null +++ b/core/src/system_monitor_service.rs @@ -0,0 +1,227 @@ +use std::{ + collections::HashMap, + io::BufRead, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +#[cfg(target_os = "linux")] +use std::{fs::File, io::BufReader, path::Path}; + +const SAMPLE_INTERVAL: Duration = Duration::from_secs(60); +const SLEEP_INTERVAL: Duration = Duration::from_millis(500); + +#[cfg(target_os = "linux")] +const PROC_NET_SNMP_PATH: &str = "/proc/net/snmp"; + +pub struct SystemMonitorService { + thread_hdl: JoinHandle<()>, +} + +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +struct UdpStats { + in_datagrams: usize, + no_ports: usize, + in_errors: usize, + out_datagrams: usize, + rcvbuf_errors: usize, + sndbuf_errors: usize, + in_csum_errors: usize, + ignored_multi: usize, +} + +#[cfg(target_os = "linux")] +fn read_udp_stats(file_path: impl AsRef) -> Result { + let file = File::open(file_path).map_err(|e| e.to_string())?; + let mut reader = BufReader::new(file); + parse_udp_stats(&mut reader) +} + +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +fn parse_udp_stats(reader: &mut impl BufRead) -> Result { + let mut udp_lines = Vec::default(); + for line in reader.lines() { + let line = line.map_err(|e| e.to_string())?; + if line.starts_with("Udp:") { + udp_lines.push(line); + if udp_lines.len() == 2 { + break; + } + } + } + if udp_lines.len() != 2 { + return Err(format!( + "parse error, expected 2 lines, num lines: {}", + udp_lines.len() + )); + } + + let pairs: Vec<_> = udp_lines[0] + .split_ascii_whitespace() + .zip(udp_lines[1].split_ascii_whitespace()) + .collect(); + let udp_stats: HashMap = pairs[1..] + .iter() + .map(|(label, val)| (label.to_string(), val.parse::().unwrap())) + .collect(); + + let stats = UdpStats { + in_datagrams: *udp_stats.get("InDatagrams").unwrap_or(&0), + no_ports: *udp_stats.get("NoPorts").unwrap_or(&0), + in_errors: *udp_stats.get("InErrors").unwrap_or(&0), + out_datagrams: *udp_stats.get("OutDatagrams").unwrap_or(&0), + rcvbuf_errors: *udp_stats.get("RcvbufErrors").unwrap_or(&0), + sndbuf_errors: *udp_stats.get("SndbufErrors").unwrap_or(&0), + in_csum_errors: *udp_stats.get("InCsumErrors").unwrap_or(&0), + ignored_multi: *udp_stats.get("IgnoredMulti").unwrap_or(&0), + }; + + Ok(stats) +} + +#[cfg(target_os = "linux")] +pub fn verify_udp_stats_access() -> Result<(), String> { + read_udp_stats(PROC_NET_SNMP_PATH)?; + Ok(()) +} + +#[cfg(not(target_os = "linux"))] +pub fn verify_udp_stats_access() -> Result<(), String> { + Ok(()) +} + +impl SystemMonitorService { + pub fn new(exit: Arc) -> Self { + info!("Starting SystemMonitorService"); + let thread_hdl = Builder::new() + .name("system-monitor".to_string()) + .spawn(move || { + Self::run(exit); + }) + .unwrap(); + + Self { thread_hdl } + } + + #[cfg(target_os = "linux")] + fn process_udp_stats(udp_stats: &mut Option) { + match read_udp_stats(PROC_NET_SNMP_PATH) { + Ok(new_stats) => { + if let Some(old_stats) = udp_stats { + SystemMonitorService::report_udp_stats(old_stats, &new_stats); + } + *udp_stats = Some(new_stats); + } + Err(e) => warn!("read_udp_stats: {}", e), + } + } + + #[cfg(not(target_os = "linux"))] + fn process_udp_stats(_udp_stats: &mut Option) {} + + #[cfg(target_os = "linux")] + fn report_udp_stats(old_stats: &UdpStats, new_stats: &UdpStats) { + datapoint_info!( + "net-stats", + ( + "in_datagrams_delta", + new_stats.in_datagrams - old_stats.in_datagrams, + i64 + ), + ( + "no_ports_delta", + new_stats.no_ports - old_stats.no_ports, + i64 + ), + ( + "in_errors_delta", + new_stats.in_errors - old_stats.in_errors, + i64 + ), + ( + "out_datagrams_delta", + new_stats.out_datagrams - old_stats.out_datagrams, + i64 + ), + ( + "rcvbuf_errors_delta", + new_stats.rcvbuf_errors - old_stats.rcvbuf_errors, + i64 + ), + ( + "sndbuf_errors_delta", + new_stats.sndbuf_errors - old_stats.sndbuf_errors, + i64 + ), + ( + "in_csum_errors_delta", + new_stats.in_csum_errors - old_stats.in_csum_errors, + i64 + ), + ( + "ignored_multi_delta", + new_stats.ignored_multi - old_stats.ignored_multi, + i64 + ), + ("in_errors", new_stats.in_errors, i64), + ("rcvbuf_errors", new_stats.rcvbuf_errors, i64), + ("sndbuf_errors", new_stats.sndbuf_errors, i64), + ); + } + + pub fn run(exit: Arc) { + let mut udp_stats = None; + + let mut now = Instant::now(); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if now.elapsed() >= SAMPLE_INTERVAL { + now = Instant::now(); + + SystemMonitorService::process_udp_stats(&mut udp_stats); + } + + sleep(SLEEP_INTERVAL); + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_udp_stats() { + let mut mock_snmp = +b"Ip: Forwarding DefaultTTL InReceives InHdrErrors InAddrErrors ForwDatagrams InUnknownProtos InDiscards InDelivers OutRequests OutDiscards OutNoRoutes ReasmTimeout ReasmReqds ReasmOKs ReasmFails FragOKs FragFails FragCreates +Ip: 1 64 357 0 2 0 0 0 355 315 0 6 0 0 0 0 0 0 0 +Icmp: InMsgs InErrors InCsumErrors InDestUnreachs InTimeExcds InParmProbs InSrcQuenchs InRedirects InEchos InEchoReps InTimestamps InTimestampReps InAddrMasks InAddrMaskReps OutMsgs OutErrors OutDestUnreachs OutTimeExcds OutParmProbs OutSrcQuenchs OutRedirects OutEchos OutEchoReps OutTimestamps OutTimestampReps OutAddrMasks OutAddrMaskReps +Icmp: 3 0 0 3 0 0 0 0 0 0 0 0 0 0 7 0 7 0 0 0 0 0 0 0 0 0 0 +IcmpMsg: InType3 OutType3 +IcmpMsg: 3 7 +Tcp: RtoAlgorithm RtoMin RtoMax MaxConn ActiveOpens PassiveOpens AttemptFails EstabResets CurrEstab InSegs OutSegs RetransSegs InErrs OutRsts InCsumErrors +Tcp: 1 200 120000 -1 29 1 0 0 5 318 279 0 0 4 0 +Udp: InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors InCsumErrors IgnoredMulti +Udp: 27 7 0 30 0 0 0 0 +UdpLite: InDatagrams NoPorts InErrors OutDatagrams RcvbufErrors SndbufErrors InCsumErrors IgnoredMulti +UdpLite: 0 0 0 0 0 0 0 0" as &[u8]; + let stats = parse_udp_stats(&mut mock_snmp).unwrap(); + assert_eq!(stats.out_datagrams, 30); + assert_eq!(stats.no_ports, 7); + + let mut mock_snmp = b"unexpected data" as &[u8]; + let stats = parse_udp_stats(&mut mock_snmp); + assert!(stats.is_err()); + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index bc6b3d3caf..2ec859a7c0 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -14,6 +14,7 @@ use { serve_repair_service::ServeRepairService, sigverify, snapshot_packager_service::SnapshotPackagerService, + system_monitor_service::{verify_udp_stats_access, SystemMonitorService}, tower_storage::TowerStorage, tpu::{Tpu, DEFAULT_TPU_COALESCE_MS}, tvu::{Sockets, Tvu, TvuConfig}, @@ -275,6 +276,7 @@ pub struct Validator { transaction_status_service: Option, rewards_recorder_service: Option, cache_block_meta_service: Option, + system_monitor_service: Option, sample_performance_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, @@ -448,6 +450,12 @@ impl Validator { *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; + verify_udp_stats_access().unwrap_or_else(|err| { + error!("Failed to access UDP stats: {}", err); + abort(); + }); + let system_monitor_service = Some(SystemMonitorService::new(Arc::clone(&exit))); + let leader_schedule_cache = Arc::new(leader_schedule_cache); let bank = bank_forks.working_bank(); if let Some(ref shrink_paths) = config.account_shrink_paths { @@ -872,6 +880,7 @@ impl Validator { transaction_status_service, rewards_recorder_service, cache_block_meta_service, + system_monitor_service, sample_performance_service, snapshot_packager_service, completed_data_sets_service, @@ -966,6 +975,12 @@ impl Validator { .expect("cache_block_meta_service"); } + if let Some(system_monitor_service) = self.system_monitor_service { + system_monitor_service + .join() + .expect("system_monitor_service"); + } + if let Some(sample_performance_service) = self.sample_performance_service { sample_performance_service .join()