rough impl of metrics-observer

This commit is contained in:
Toby Lawrence 2020-10-05 10:22:45 -04:00
parent 2125de533b
commit 623ccf110a
8 changed files with 536 additions and 0 deletions

View File

@ -7,4 +7,5 @@ members = [
"metrics-exporter-tcp",
"metrics-exporter-prometheus",
"metrics-tracing-context",
"metrics-observer",
]

View File

@ -0,0 +1,25 @@
[package]
name = "metrics-observer"
version = "0.1.0"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
license = "MIT"
[dependencies]
getopts = "0.2"
bytes = "0.5"
crossbeam-channel = "0.4"
prost = "0.6"
prost-types = "0.6"
tui = "0.12"
termion = "1.5"
hdrhistogram = "7.1"
evmap = "10.0"
chrono = "0.4"
metrics = { version = "0.13.0-alpha.5", path = "../metrics" }
metrics-util = { version = "0.4.0-alpha.3", path = "../metrics-util" }
[build-dependencies]
prost-build = "0.6"
built = "0.4"

View File

@ -0,0 +1,9 @@
fn main() {
println!("cargo:rerun-if-changed=proto/event.proto");
let mut prost_build = prost_build::Config::new();
prost_build.btree_map(&["."]);
prost_build
.compile_protos(&["proto/event.proto"], &["proto/"])
.unwrap();
built::write_built_file().unwrap();
}

View File

@ -0,0 +1,28 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package event.proto;
message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
map<string, string> labels = 3;
oneof value {
Counter counter = 4;
Gauge gauge = 5;
Histogram histogram = 6;
}
}
message Counter {
uint64 value = 1;
}
message Gauge {
double value = 1;
}
message Histogram {
uint64 value = 1;
}

View File

@ -0,0 +1,40 @@
use std::io;
use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, TrySendError, RecvTimeoutError};
use termion::event::Key;
use termion::input::TermRead;
pub struct InputEvents {
rx: Receiver<Key>,
handle: thread::JoinHandle<()>,
}
impl InputEvents {
pub fn new() -> InputEvents {
let (tx, rx) = bounded(1);
let handle = thread::spawn(move || {
let stdin = io::stdin();
for evt in stdin.keys() {
if let Ok(key) = evt {
// If our queue is full, we don't care. The user can just press the key again.
if let Err(TrySendError::Disconnected(_)) = tx.try_send(key) {
eprintln!("input event channel disconnected");
return;
}
}
}
});
Self { rx, handle }
}
pub fn next(&mut self) -> Result<Option<Key>, RecvTimeoutError> {
match self.rx.recv_timeout(Duration::from_secs(1)) {
Ok(key) => Ok(Some(key)),
Err(RecvTimeoutError::Timeout) => Ok(None),
Err(e) => Err(e),
}
}
}

View File

@ -0,0 +1,154 @@
use std::{error::Error, io};
use chrono::Local;
use termion::{event::Key, input::MouseTerminal, raw::IntoRawMode, screen::AlternateScreen};
use tui::{
backend::TermionBackend,
layout::{Constraint, Direction, Layout},
style::{Color, Modifier, Style},
text::{Span, Spans},
widgets::{Block, Borders, Paragraph, Wrap, List, ListItem},
Terminal,
};
mod input;
use self::input::InputEvents;
mod metrics;
use self::metrics::{ClientState, MetricData};
mod selector;
use self::selector::Selector;
fn main() -> Result<(), Box<dyn Error>> {
let stdout = io::stdout().into_raw_mode()?;
let stdout = MouseTerminal::from(stdout);
let stdout = AlternateScreen::from(stdout);
let backend = TermionBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let mut events = InputEvents::new();
let client = metrics::Client::new("127.0.0.1:5000".to_string());
let mut selector = Selector::new();
loop {
terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(1)
.constraints(
[
Constraint::Length(4),
Constraint::Percentage(90)
].as_ref()
)
.split(f.size());
let current_dt = Local::now().format(" (%Y/%m/%d %I:%M:%S %p)").to_string();
let client_state = match client.state() {
ClientState::Disconnected(s) => {
let mut spans = vec![
Span::raw("state: "),
Span::styled("disconnected", Style::default().fg(Color::Red)),
];
if let Some(s) = s {
spans.push(Span::raw(" "));
spans.push(Span::raw(s));
}
Spans::from(spans)
},
ClientState::Connected => Spans::from(vec![
Span::raw("state: "),
Span::styled("connected", Style::default().fg(Color::Green)),
]),
};
let header_block = Block::default()
.title(vec![
Span::styled("metrics-observer", Style::default().add_modifier(Modifier::BOLD)),
Span::raw(current_dt),
])
.borders(Borders::ALL);
let text = vec![
client_state.into(),
Spans::from(vec![
Span::styled("controls: ", Style::default().add_modifier(Modifier::BOLD)),
Span::raw("up/down = scroll, q = quit"),
]),
];
let header = Paragraph::new(text)
.block(header_block)
.wrap(Wrap { trim: true });
f.render_widget(header, chunks[0]);
// Knock 5 off the line width to account for 3-character highlight symbol + borders.
let line_width = chunks[1].width.saturating_sub(6) as usize;
let items = client.with_metrics(|metrics| {
let mut items = Vec::new();
for (key, value) in metrics.iter() {
let inner_key = key.key();
let name = inner_key.name();
let labels = inner_key.labels().map(|label| format!("{} = {}", label.key(), label.value())).collect::<Vec<_>>();
let display_name = if labels.is_empty() {
name.to_string()
} else {
format!("{} [{}]", name, labels.join(", "))
};
let display_value = match value {
MetricData::Counter(value) => format!("total: {}", value),
MetricData::Gauge(value) => format!("current: {}", value),
MetricData::Histogram(value) => {
let min = value.min();
let max = value.max();
let p50 = value.value_at_quantile(0.5);
let p99 = value.value_at_quantile(0.99);
let p999 = value.value_at_quantile(0.999);
format!("min: {} p50: {} p99: {} p999: {} max: {}",
min, p50, p99, p999, max)
},
};
let name_length = display_name.chars().count();
let value_length = display_value.chars().count();
let space = line_width.saturating_sub(name_length).saturating_sub(value_length);
let display = format!("{}{}{}", display_name, " ".repeat(space), display_value);
items.push(ListItem::new(display));
}
items
});
selector.set_length(items.len());
let metrics_block = Block::default()
.title("observed metrics")
.borders(Borders::ALL);
let metrics = List::new(items)
.block(metrics_block)
.highlight_symbol(">> ");
f.render_stateful_widget(metrics, chunks[1], selector.state());
})?;
// Poll the event queue for input events. `next` will only block for 1 second,
// so our screen is never stale by more than 1 second.
if let Some(input) = events.next()? {
match input {
Key::Char('q') => break,
Key::Up => selector.previous(),
Key::Down => selector.next(),
Key::PageUp => selector.top(),
Key::PageDown => selector.bottom(),
_ => {},
}
}
}
Ok(())
}

View File

@ -0,0 +1,221 @@
use std::collections::HashMap;
use std::net::TcpStream;
use std::time::Duration;
use std::thread;
use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, RwLock};
use std::io::Read;
use bytes::{BufMut, BytesMut};
use prost::Message;
use hdrhistogram::Histogram;
use metrics::{Label, KeyData};
use metrics_util::{CompositeKey, MetricKind};
mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}
#[derive(Clone)]
pub enum ClientState {
Disconnected(Option<String>),
Connected,
}
pub enum MetricData {
Counter(u64),
Gauge(f64),
Histogram(Histogram<u64>),
}
pub struct Client {
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
handle: thread::JoinHandle<()>,
}
impl Client {
pub fn new(addr: String) -> Client {
let state = Arc::new(Mutex::new(ClientState::Disconnected(None)));
let metrics = Arc::new(RwLock::new(HashMap::new()));
let handle = {
let state = state.clone();
let metrics = metrics.clone();
thread::spawn(move || {
let mut runner = Runner::new(addr, state, metrics);
runner.run();
})
};
Client {
state,
metrics,
handle,
}
}
pub fn state(&self) -> ClientState {
self.state.lock().unwrap().clone()
}
pub fn with_metrics<F, T>(&self, f: F) -> T
where
F: FnOnce(&HashMap<CompositeKey, MetricData>) -> T,
{
let handle = self.metrics.read().unwrap();
f(&handle)
}
}
enum RunnerState {
Disconnected,
ErrorBackoff(&'static str, Duration),
Connected(TcpStream),
}
struct Runner {
state: RunnerState,
addr: String,
client_state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
}
impl Runner {
pub fn new(
addr: String,
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
) -> Runner {
Runner {
state: RunnerState::Disconnected,
addr,
client_state: state,
metrics,
}
}
/*pub fn run(&mut self) {
let mut metrics = self.metrics.write().unwrap();
metrics.insert(("test_counter".into(), Vec::new()), MetricData::Counter(42));
metrics.insert(
("test_counter_two".into(), vec!["endpoint = http".to_string()]),
MetricData::Counter(42)
);
metrics.insert(("test_gauge".into(), Vec::new()), MetricData::Gauge(-666));
let mut histogram = Histogram::<u64>::new(3)
.expect("failed to create histogram");
for i in 1..100 {
histogram.record(i).expect("failed to record value");
}
metrics.insert(("test_histogram".into(), Vec::new()), MetricData::Histogram(histogram));
}*/
pub fn run(&mut self) {
loop {
let next = match self.state {
RunnerState::Disconnected => {
// Just reset the client state here to be sure.
{
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(None);
}
// Try to connect to our target and transition into Connected.
let addr = match self.addr.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(addr) => addr,
None => {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some("failed to resolve specified host".to_string()));
break;
}
}
Err(_) => {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some("failed to resolve specified host".to_string()));
break;
}
};
match TcpStream::connect_timeout(&addr, Duration::from_secs(3)) {
Ok(stream) => RunnerState::Connected(stream),
Err(_) => {
RunnerState::ErrorBackoff("error while connecting", Duration::from_secs(3))
}
}
},
RunnerState::ErrorBackoff(msg, dur) => {
{
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some(format!("{}, retrying in {} seconds...", msg, dur.as_secs())));
}
thread::sleep(dur);
RunnerState::Disconnected
},
RunnerState::Connected(ref mut stream) => {
{
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Connected;
}
let mut buf = BytesMut::new();
let mut rbuf = [0u8; 1024];
loop {
match stream.read(&mut rbuf[..]) {
Ok(0) => break,
Ok(n) => buf.put_slice(&rbuf[..n]),
Err(e) => eprintln!("read error: {:?}", e),
};
match proto::Metric::decode_length_delimited(&mut buf) {
Err(e) => eprintln!("decode error: {:?}", e),
Ok(msg) => {
let mut labels_raw = msg.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw.into_iter().map(|(k, v)| Label::new(k, v)).collect::<Vec<_>>();
let key_data: KeyData = (msg.name, labels).into();
match msg.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let counter = metrics.entry(key).or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
},
proto::metric::Value::Gauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics.entry(key).or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value.value;
}
},
proto::metric::Value::Histogram(value) => {
let key = CompositeKey::new(MetricKind::Histogram, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let histogram = metrics.entry(key).or_insert_with(|| {
let histogram = Histogram::new(3).expect("failed to create histogram");
MetricData::Histogram(histogram)
});
if let MetricData::Histogram(inner) = histogram {
inner.record(value.value).expect("failed to record value to histogram");
}
},
}
},
}
}
RunnerState::ErrorBackoff("error while observing", Duration::from_secs(3))
}
};
self.state = next;
}
}
}

View File

@ -0,0 +1,58 @@
use tui::widgets::ListState;
pub struct Selector(usize, ListState);
impl Selector {
pub fn new() -> Selector {
let mut state = ListState::default();
state.select(Some(0));
Selector(0, state)
}
pub fn set_length(&mut self, len: usize) {
if len < self.0 {
self.1.select(Some(0));
}
self.0 = len;
}
pub fn state(&mut self) -> &mut ListState {
&mut self.1
}
pub fn top(&mut self) {
self.1.select(Some(0));
}
pub fn bottom(&mut self) {
self.1.select(Some(self.0 - 1));
}
pub fn next(&mut self) {
let i = match self.1.selected() {
Some(i) => {
if i >= self.0 - 1 {
0
} else {
i + 1
}
}
None => 0,
};
self.1.select(Some(i));
}
pub fn previous(&mut self) {
let i = match self.1.selected() {
Some(i) => {
if i == 0 {
self.0 - 1
} else {
i - 1
}
}
None => 0,
};
self.1.select(Some(i));
}
}