From 5763651a4115ab0c799ad6045f867154f51463e4 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Sat, 24 Oct 2020 12:36:43 -0400 Subject: [PATCH] drop metrics if no clients are connected --- metrics-exporter-tcp/examples/tcp_client.rs | 4 +- metrics-exporter-tcp/src/lib.rs | 79 ++++++++++++++++++--- 2 files changed, 70 insertions(+), 13 deletions(-) diff --git a/metrics-exporter-tcp/examples/tcp_client.rs b/metrics-exporter-tcp/examples/tcp_client.rs index cc4e7d5..259b389 100644 --- a/metrics-exporter-tcp/examples/tcp_client.rs +++ b/metrics-exporter-tcp/examples/tcp_client.rs @@ -25,9 +25,9 @@ fn main() { Err(e) => eprintln!("read error: {:?}", e), }; - match proto::Metric::decode_length_delimited(&mut buf) { + match proto::Event::decode_length_delimited(&mut buf) { Err(e) => eprintln!("decode error: {:?}", e), - Ok(msg) => println!("metric: {:?}", msg), + Ok(msg) => println!("event: {:?}", msg), } } } diff --git a/metrics-exporter-tcp/src/lib.rs b/metrics-exporter-tcp/src/lib.rs index bcf965e..337cf9a 100644 --- a/metrics-exporter-tcp/src/lib.rs +++ b/metrics-exporter-tcp/src/lib.rs @@ -47,7 +47,10 @@ use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{self, Write}; use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; use std::thread; use std::time::SystemTime; @@ -105,10 +108,55 @@ impl From for Error { } } +#[derive(Clone)] +struct State { + client_count: Arc>, + should_send: Arc, + waker: Arc, +} + +impl State { + pub fn from_waker(waker: Waker) -> State { + State { + client_count: Arc::new(Mutex::new(0)), + should_send: Arc::new(AtomicBool::new(false)), + waker: Arc::new(waker), + } + } + + pub fn should_send(&self) -> bool { + self.should_send.load(Ordering::Relaxed) + } + + pub fn increment_clients(&self) { + // This is slightly overkill _but_ it means we can ensure no wrapping + // addition or subtraction, keeping our "if no clients, don't send" logic + // intact in the face of a logic mistake on our part. + let mut count = self.client_count.lock().unwrap(); + *count = count.saturating_add(1); + self.should_send.store(true, Ordering::SeqCst); + } + + pub fn decrement_clients(&self) { + // This is slightly overkill _but_ it means we can ensure no wrapping + // addition or subtraction, keeping our "if no clients, don't send" logic + // intact in the face of a logic mistake on our part. + let mut count = self.client_count.lock().unwrap(); + *count = count.saturating_sub(1); + if *count == 0 { + self.should_send.store(false, Ordering::SeqCst); + } + } + + pub fn wake(&self) { + let _ = self.waker.wake(); + } +} + /// A TCP recorder. pub struct TcpRecorder { tx: Sender, - waker: Arc, + state: State, } /// Builder for creating and installing a TCP recorder/exporter. @@ -181,18 +229,19 @@ impl TcpBuilder { }; let poll = Poll::new()?; - let waker = Arc::new(Waker::new(poll.registry(), WAKER)?); + let waker = Waker::new(poll.registry(), WAKER)?; let mut listener = TcpListener::bind(self.listen_addr)?; poll.registry() .register(&mut listener, LISTENER, Interest::READABLE)?; + let state = State::from_waker(waker); let recorder = TcpRecorder { tx, - waker: Arc::clone(&waker), + state: state.clone(), }; - thread::spawn(move || run_transport(poll, waker, listener, rx, buffer_size)); + thread::spawn(move || run_transport(poll, listener, rx, state, buffer_size)); Ok(recorder) } } @@ -208,12 +257,14 @@ impl TcpRecorder { let _ = self .tx .try_send(Event::Metadata(key, metric_type, unit, description)); - let _ = self.waker.wake(); + self.state.wake(); } fn push_metric(&self, key: Key, value: MetricValue) { - let _ = self.tx.try_send(Event::Metric(key, value)); - let _ = self.waker.wake(); + if self.state.should_send() { + let _ = self.tx.try_send(Event::Metric(key, value)); + self.state.wake(); + } } } @@ -245,9 +296,9 @@ impl Recorder for TcpRecorder { fn run_transport( mut poll: Poll, - waker: Arc, listener: TcpListener, rx: Receiver, + state: State, buffer_size: Option, ) { let buffer_limit = buffer_size.unwrap_or(std::usize::MAX); @@ -284,7 +335,7 @@ fn run_transport( if buffered_pmsgs.len() >= buffer_limit { // We didn't drain ourselves here, so schedule a future wake so we // continue to drain remaining metrics. - let _ = waker.wake(); + state.wake(); break; } @@ -329,6 +380,7 @@ fn run_transport( let done = drive_connection(conn, wbuf, msgs); if done { clients_to_remove.push(*token); + state.decrement_clients(); continue; } @@ -353,6 +405,7 @@ fn run_transport( let done = drive_connection(conn, wbuf, msgs); if done { clients_to_remove.push(*token); + state.decrement_clients(); } } @@ -365,6 +418,7 @@ fn run_transport( if let Some((conn, _, _)) = clients.get_mut(&token) { trace!(?conn, ?token, "removing client"); clients.remove(&token); + state.decrement_clients(); } } } @@ -379,6 +433,8 @@ fn run_transport( .register(&mut conn, token, CLIENT_INTEREST) .expect("failed to register interest for client connection"); + state.increment_clients(); + // Start tracking them, and enqueue all of the metadata. let metadata = generate_metadata_messages(&metadata); clients @@ -401,6 +457,7 @@ fn run_transport( if done { trace!(?conn, ?token, "removing client"); clients.remove(&token); + state.decrement_clients(); } } } @@ -448,7 +505,7 @@ fn drive_connection( }; match conn.write(&buf) { - // Zero write = client closedd their connection, so remove 'em. + // Zero write = client closed their connection, so remove 'em. Ok(0) => { trace!(?conn, "zero write, closing client"); return true;