diff --git a/Cargo.toml b/Cargo.toml index c026295..f9304d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,5 @@ members = [ "metrics-exporter-tcp", "metrics-exporter-prometheus", "metrics-tracing-context", + "metrics-observer", ] diff --git a/metrics-observer/Cargo.toml b/metrics-observer/Cargo.toml new file mode 100644 index 0000000..54990ae --- /dev/null +++ b/metrics-observer/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "metrics-observer" +version = "0.1.0" +authors = ["Toby Lawrence "] +edition = "2018" + +license = "MIT" + +[dependencies] +getopts = "0.2" +bytes = "0.5" +crossbeam-channel = "0.4" +prost = "0.6" +prost-types = "0.6" +tui = "0.12" +termion = "1.5" +hdrhistogram = "7.1" +evmap = "10.0" +chrono = "0.4" +metrics = { version = "0.13.0-alpha.5", path = "../metrics" } +metrics-util = { version = "0.4.0-alpha.3", path = "../metrics-util" } + +[build-dependencies] +prost-build = "0.6" +built = "0.4" \ No newline at end of file diff --git a/metrics-observer/build.rs b/metrics-observer/build.rs new file mode 100644 index 0000000..066cbe3 --- /dev/null +++ b/metrics-observer/build.rs @@ -0,0 +1,9 @@ +fn main() { + println!("cargo:rerun-if-changed=proto/event.proto"); + let mut prost_build = prost_build::Config::new(); + prost_build.btree_map(&["."]); + prost_build + .compile_protos(&["proto/event.proto"], &["proto/"]) + .unwrap(); + built::write_built_file().unwrap(); +} diff --git a/metrics-observer/proto/event.proto b/metrics-observer/proto/event.proto new file mode 100644 index 0000000..82280de --- /dev/null +++ b/metrics-observer/proto/event.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package event.proto; + +message Metric { + string name = 1; + google.protobuf.Timestamp timestamp = 2; + map labels = 3; + oneof value { + Counter counter = 4; + Gauge gauge = 5; + Histogram histogram = 6; + } +} + +message Counter { + uint64 value = 1; +} + +message Gauge { + double value = 1; +} + +message Histogram { + uint64 value = 1; +} diff --git a/metrics-observer/src/input.rs b/metrics-observer/src/input.rs new file mode 100644 index 0000000..04b628e --- /dev/null +++ b/metrics-observer/src/input.rs @@ -0,0 +1,40 @@ +use std::io; +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{bounded, Receiver, TrySendError, RecvTimeoutError}; +use termion::event::Key; +use termion::input::TermRead; + +pub struct InputEvents { + rx: Receiver, + handle: thread::JoinHandle<()>, +} + +impl InputEvents { + pub fn new() -> InputEvents { + let (tx, rx) = bounded(1); + let handle = thread::spawn(move || { + let stdin = io::stdin(); + for evt in stdin.keys() { + if let Ok(key) = evt { + // If our queue is full, we don't care. The user can just press the key again. + if let Err(TrySendError::Disconnected(_)) = tx.try_send(key) { + eprintln!("input event channel disconnected"); + return; + } + } + } + }); + + Self { rx, handle } + } + + pub fn next(&mut self) -> Result, RecvTimeoutError> { + match self.rx.recv_timeout(Duration::from_secs(1)) { + Ok(key) => Ok(Some(key)), + Err(RecvTimeoutError::Timeout) => Ok(None), + Err(e) => Err(e), + } + } +} \ No newline at end of file diff --git a/metrics-observer/src/main.rs b/metrics-observer/src/main.rs new file mode 100644 index 0000000..6dea223 --- /dev/null +++ b/metrics-observer/src/main.rs @@ -0,0 +1,154 @@ +use std::{error::Error, io}; + +use chrono::Local; +use termion::{event::Key, input::MouseTerminal, raw::IntoRawMode, screen::AlternateScreen}; +use tui::{ + backend::TermionBackend, + layout::{Constraint, Direction, Layout}, + style::{Color, Modifier, Style}, + text::{Span, Spans}, + widgets::{Block, Borders, Paragraph, Wrap, List, ListItem}, + Terminal, +}; + +mod input; +use self::input::InputEvents; + +mod metrics; +use self::metrics::{ClientState, MetricData}; + +mod selector; +use self::selector::Selector; + +fn main() -> Result<(), Box> { + let stdout = io::stdout().into_raw_mode()?; + let stdout = MouseTerminal::from(stdout); + let stdout = AlternateScreen::from(stdout); + let backend = TermionBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + let mut events = InputEvents::new(); + let client = metrics::Client::new("127.0.0.1:5000".to_string()); + let mut selector = Selector::new(); + + loop { + terminal.draw(|f| { + let chunks = Layout::default() + .direction(Direction::Vertical) + .margin(1) + .constraints( + [ + Constraint::Length(4), + Constraint::Percentage(90) + ].as_ref() + ) + .split(f.size()); + + let current_dt = Local::now().format(" (%Y/%m/%d %I:%M:%S %p)").to_string(); + let client_state = match client.state() { + ClientState::Disconnected(s) => { + let mut spans = vec![ + Span::raw("state: "), + Span::styled("disconnected", Style::default().fg(Color::Red)), + ]; + + if let Some(s) = s { + spans.push(Span::raw(" ")); + spans.push(Span::raw(s)); + } + + Spans::from(spans) + }, + ClientState::Connected => Spans::from(vec![ + Span::raw("state: "), + Span::styled("connected", Style::default().fg(Color::Green)), + ]), + }; + + let header_block = Block::default() + .title(vec![ + Span::styled("metrics-observer", Style::default().add_modifier(Modifier::BOLD)), + Span::raw(current_dt), + ]) + .borders(Borders::ALL); + + let text = vec![ + client_state.into(), + Spans::from(vec![ + Span::styled("controls: ", Style::default().add_modifier(Modifier::BOLD)), + Span::raw("up/down = scroll, q = quit"), + ]), + ]; + let header = Paragraph::new(text) + .block(header_block) + .wrap(Wrap { trim: true }); + + f.render_widget(header, chunks[0]); + + // Knock 5 off the line width to account for 3-character highlight symbol + borders. + let line_width = chunks[1].width.saturating_sub(6) as usize; + let items = client.with_metrics(|metrics| { + let mut items = Vec::new(); + for (key, value) in metrics.iter() { + let inner_key = key.key(); + let name = inner_key.name(); + let labels = inner_key.labels().map(|label| format!("{} = {}", label.key(), label.value())).collect::>(); + let display_name = if labels.is_empty() { + name.to_string() + } else { + format!("{} [{}]", name, labels.join(", ")) + }; + + let display_value = match value { + MetricData::Counter(value) => format!("total: {}", value), + MetricData::Gauge(value) => format!("current: {}", value), + MetricData::Histogram(value) => { + let min = value.min(); + let max = value.max(); + let p50 = value.value_at_quantile(0.5); + let p99 = value.value_at_quantile(0.99); + let p999 = value.value_at_quantile(0.999); + + format!("min: {} p50: {} p99: {} p999: {} max: {}", + min, p50, p99, p999, max) + }, + }; + + let name_length = display_name.chars().count(); + let value_length = display_value.chars().count(); + let space = line_width.saturating_sub(name_length).saturating_sub(value_length); + + let display = format!("{}{}{}", display_name, " ".repeat(space), display_value); + items.push(ListItem::new(display)); + } + items + }); + selector.set_length(items.len()); + + let metrics_block = Block::default() + .title("observed metrics") + .borders(Borders::ALL); + + let metrics = List::new(items) + .block(metrics_block) + .highlight_symbol(">> "); + + f.render_stateful_widget(metrics, chunks[1], selector.state()); + })?; + + // Poll the event queue for input events. `next` will only block for 1 second, + // so our screen is never stale by more than 1 second. + if let Some(input) = events.next()? { + match input { + Key::Char('q') => break, + Key::Up => selector.previous(), + Key::Down => selector.next(), + Key::PageUp => selector.top(), + Key::PageDown => selector.bottom(), + _ => {}, + } + } + } + + Ok(()) +} \ No newline at end of file diff --git a/metrics-observer/src/metrics.rs b/metrics-observer/src/metrics.rs new file mode 100644 index 0000000..0d0a4b2 --- /dev/null +++ b/metrics-observer/src/metrics.rs @@ -0,0 +1,221 @@ +use std::collections::HashMap; +use std::net::TcpStream; +use std::time::Duration; +use std::thread; +use std::net::ToSocketAddrs; +use std::sync::{Arc, Mutex, RwLock}; +use std::io::Read; + +use bytes::{BufMut, BytesMut}; +use prost::Message; +use hdrhistogram::Histogram; + +use metrics::{Label, KeyData}; +use metrics_util::{CompositeKey, MetricKind}; + +mod proto { + include!(concat!(env!("OUT_DIR"), "/event.proto.rs")); +} + +#[derive(Clone)] +pub enum ClientState { + Disconnected(Option), + Connected, +} + +pub enum MetricData { + Counter(u64), + Gauge(f64), + Histogram(Histogram), +} + +pub struct Client { + state: Arc>, + metrics: Arc>>, + handle: thread::JoinHandle<()>, +} + +impl Client { + pub fn new(addr: String) -> Client { + let state = Arc::new(Mutex::new(ClientState::Disconnected(None))); + let metrics = Arc::new(RwLock::new(HashMap::new())); + let handle = { + let state = state.clone(); + let metrics = metrics.clone(); + thread::spawn(move || { + let mut runner = Runner::new(addr, state, metrics); + runner.run(); + }) + }; + + Client { + state, + metrics, + handle, + } + } + + pub fn state(&self) -> ClientState { + self.state.lock().unwrap().clone() + } + + pub fn with_metrics(&self, f: F) -> T + where + F: FnOnce(&HashMap) -> T, + { + let handle = self.metrics.read().unwrap(); + f(&handle) + } +} + +enum RunnerState { + Disconnected, + ErrorBackoff(&'static str, Duration), + Connected(TcpStream), +} + +struct Runner { + state: RunnerState, + addr: String, + client_state: Arc>, + metrics: Arc>>, +} + +impl Runner { + pub fn new( + addr: String, + state: Arc>, + metrics: Arc>>, + ) -> Runner { + Runner { + state: RunnerState::Disconnected, + addr, + client_state: state, + metrics, + } + } + + /*pub fn run(&mut self) { + let mut metrics = self.metrics.write().unwrap(); + + metrics.insert(("test_counter".into(), Vec::new()), MetricData::Counter(42)); + metrics.insert( + ("test_counter_two".into(), vec!["endpoint = http".to_string()]), + MetricData::Counter(42) + ); + metrics.insert(("test_gauge".into(), Vec::new()), MetricData::Gauge(-666)); + + let mut histogram = Histogram::::new(3) + .expect("failed to create histogram"); + for i in 1..100 { + histogram.record(i).expect("failed to record value"); + } + metrics.insert(("test_histogram".into(), Vec::new()), MetricData::Histogram(histogram)); + }*/ + + pub fn run(&mut self) { + loop { + let next = match self.state { + RunnerState::Disconnected => { + // Just reset the client state here to be sure. + { + let mut state = self.client_state.lock().unwrap(); + *state = ClientState::Disconnected(None); + } + + // Try to connect to our target and transition into Connected. + let addr = match self.addr.to_socket_addrs() { + Ok(mut addrs) => match addrs.next() { + Some(addr) => addr, + None => { + let mut state = self.client_state.lock().unwrap(); + *state = ClientState::Disconnected(Some("failed to resolve specified host".to_string())); + break; + } + } + Err(_) => { + let mut state = self.client_state.lock().unwrap(); + *state = ClientState::Disconnected(Some("failed to resolve specified host".to_string())); + break; + } + }; + match TcpStream::connect_timeout(&addr, Duration::from_secs(3)) { + Ok(stream) => RunnerState::Connected(stream), + Err(_) => { + RunnerState::ErrorBackoff("error while connecting", Duration::from_secs(3)) + } + } + }, + RunnerState::ErrorBackoff(msg, dur) => { + { + let mut state = self.client_state.lock().unwrap(); + *state = ClientState::Disconnected(Some(format!("{}, retrying in {} seconds...", msg, dur.as_secs()))); + } + thread::sleep(dur); + RunnerState::Disconnected + }, + RunnerState::Connected(ref mut stream) => { + { + let mut state = self.client_state.lock().unwrap(); + *state = ClientState::Connected; + } + + let mut buf = BytesMut::new(); + let mut rbuf = [0u8; 1024]; + + loop { + match stream.read(&mut rbuf[..]) { + Ok(0) => break, + Ok(n) => buf.put_slice(&rbuf[..n]), + Err(e) => eprintln!("read error: {:?}", e), + }; + + match proto::Metric::decode_length_delimited(&mut buf) { + Err(e) => eprintln!("decode error: {:?}", e), + Ok(msg) => { + let mut labels_raw = msg.labels.into_iter().collect::>(); + labels_raw.sort_by(|a, b| a.0.cmp(&b.0)); + let labels = labels_raw.into_iter().map(|(k, v)| Label::new(k, v)).collect::>(); + let key_data: KeyData = (msg.name, labels).into(); + + match msg.value.expect("no metric value") { + proto::metric::Value::Counter(value) => { + let key = CompositeKey::new(MetricKind::Counter, key_data.into()); + let mut metrics = self.metrics.write().unwrap(); + let counter = metrics.entry(key).or_insert_with(|| MetricData::Counter(0)); + if let MetricData::Counter(inner) = counter { + *inner += value.value; + } + }, + proto::metric::Value::Gauge(value) => { + let key = CompositeKey::new(MetricKind::Gauge, key_data.into()); + let mut metrics = self.metrics.write().unwrap(); + let gauge = metrics.entry(key).or_insert_with(|| MetricData::Gauge(0.0)); + if let MetricData::Gauge(inner) = gauge { + *inner = value.value; + } + }, + proto::metric::Value::Histogram(value) => { + let key = CompositeKey::new(MetricKind::Histogram, key_data.into()); + let mut metrics = self.metrics.write().unwrap(); + let histogram = metrics.entry(key).or_insert_with(|| { + let histogram = Histogram::new(3).expect("failed to create histogram"); + MetricData::Histogram(histogram) + }); + + if let MetricData::Histogram(inner) = histogram { + inner.record(value.value).expect("failed to record value to histogram"); + } + }, + } + }, + } + } + + RunnerState::ErrorBackoff("error while observing", Duration::from_secs(3)) + } + }; + self.state = next; + } + } +} \ No newline at end of file diff --git a/metrics-observer/src/selector.rs b/metrics-observer/src/selector.rs new file mode 100644 index 0000000..0b4ca3c --- /dev/null +++ b/metrics-observer/src/selector.rs @@ -0,0 +1,58 @@ +use tui::widgets::ListState; + +pub struct Selector(usize, ListState); + +impl Selector { + pub fn new() -> Selector { + let mut state = ListState::default(); + state.select(Some(0)); + Selector(0, state) + } + + pub fn set_length(&mut self, len: usize) { + if len < self.0 { + self.1.select(Some(0)); + } + self.0 = len; + } + + pub fn state(&mut self) -> &mut ListState { + &mut self.1 + } + + pub fn top(&mut self) { + self.1.select(Some(0)); + } + + pub fn bottom(&mut self) { + self.1.select(Some(self.0 - 1)); + } + + pub fn next(&mut self) { + let i = match self.1.selected() { + Some(i) => { + if i >= self.0 - 1 { + 0 + } else { + i + 1 + } + } + None => 0, + }; + self.1.select(Some(i)); + } + + pub fn previous(&mut self) { + let i = match self.1.selected() { + Some(i) => { + if i == 0 { + self.0 - 1 + } else { + i - 1 + } + } + None => 0, + }; + self.1.select(Some(i)); + } +} \ No newline at end of file