Merge pull request #112 from metrics-rs/drop_metrics_no_clients_tcp
metrics-exporter-tcp: drop metrics when no clients are connected.
This commit is contained in:
commit
d7efc64ff7
|
@ -25,9 +25,9 @@ fn main() {
|
|||
Err(e) => eprintln!("read error: {:?}", e),
|
||||
};
|
||||
|
||||
match proto::Metric::decode_length_delimited(&mut buf) {
|
||||
match proto::Event::decode_length_delimited(&mut buf) {
|
||||
Err(e) => eprintln!("decode error: {:?}", e),
|
||||
Ok(msg) => println!("metric: {:?}", msg),
|
||||
Ok(msg) => println!("event: {:?}", msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,10 @@
|
|||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::io::{self, Write};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, Mutex,
|
||||
};
|
||||
use std::thread;
|
||||
use std::time::SystemTime;
|
||||
|
||||
|
@ -105,10 +108,55 @@ impl From<SetRecorderError> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct State {
|
||||
client_count: Arc<Mutex<usize>>,
|
||||
should_send: Arc<AtomicBool>,
|
||||
waker: Arc<Waker>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn from_waker(waker: Waker) -> State {
|
||||
State {
|
||||
client_count: Arc::new(Mutex::new(0)),
|
||||
should_send: Arc::new(AtomicBool::new(false)),
|
||||
waker: Arc::new(waker),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn should_send(&self) -> bool {
|
||||
self.should_send.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn increment_clients(&self) {
|
||||
// This is slightly overkill _but_ it means we can ensure no wrapping
|
||||
// addition or subtraction, keeping our "if no clients, don't send" logic
|
||||
// intact in the face of a logic mistake on our part.
|
||||
let mut count = self.client_count.lock().unwrap();
|
||||
*count = count.saturating_add(1);
|
||||
self.should_send.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn decrement_clients(&self) {
|
||||
// This is slightly overkill _but_ it means we can ensure no wrapping
|
||||
// addition or subtraction, keeping our "if no clients, don't send" logic
|
||||
// intact in the face of a logic mistake on our part.
|
||||
let mut count = self.client_count.lock().unwrap();
|
||||
*count = count.saturating_sub(1);
|
||||
if *count == 0 {
|
||||
self.should_send.store(false, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wake(&self) {
|
||||
let _ = self.waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// A TCP recorder.
|
||||
pub struct TcpRecorder {
|
||||
tx: Sender<Event>,
|
||||
waker: Arc<Waker>,
|
||||
state: State,
|
||||
}
|
||||
|
||||
/// Builder for creating and installing a TCP recorder/exporter.
|
||||
|
@ -181,18 +229,19 @@ impl TcpBuilder {
|
|||
};
|
||||
|
||||
let poll = Poll::new()?;
|
||||
let waker = Arc::new(Waker::new(poll.registry(), WAKER)?);
|
||||
let waker = Waker::new(poll.registry(), WAKER)?;
|
||||
|
||||
let mut listener = TcpListener::bind(self.listen_addr)?;
|
||||
poll.registry()
|
||||
.register(&mut listener, LISTENER, Interest::READABLE)?;
|
||||
|
||||
let state = State::from_waker(waker);
|
||||
let recorder = TcpRecorder {
|
||||
tx,
|
||||
waker: Arc::clone(&waker),
|
||||
state: state.clone(),
|
||||
};
|
||||
|
||||
thread::spawn(move || run_transport(poll, waker, listener, rx, buffer_size));
|
||||
thread::spawn(move || run_transport(poll, listener, rx, state, buffer_size));
|
||||
Ok(recorder)
|
||||
}
|
||||
}
|
||||
|
@ -208,12 +257,14 @@ impl TcpRecorder {
|
|||
let _ = self
|
||||
.tx
|
||||
.try_send(Event::Metadata(key, metric_type, unit, description));
|
||||
let _ = self.waker.wake();
|
||||
self.state.wake();
|
||||
}
|
||||
|
||||
fn push_metric(&self, key: Key, value: MetricValue) {
|
||||
let _ = self.tx.try_send(Event::Metric(key, value));
|
||||
let _ = self.waker.wake();
|
||||
if self.state.should_send() {
|
||||
let _ = self.tx.try_send(Event::Metric(key, value));
|
||||
self.state.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -245,9 +296,9 @@ impl Recorder for TcpRecorder {
|
|||
|
||||
fn run_transport(
|
||||
mut poll: Poll,
|
||||
waker: Arc<Waker>,
|
||||
listener: TcpListener,
|
||||
rx: Receiver<Event>,
|
||||
state: State,
|
||||
buffer_size: Option<usize>,
|
||||
) {
|
||||
let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
|
||||
|
@ -284,7 +335,7 @@ fn run_transport(
|
|||
if buffered_pmsgs.len() >= buffer_limit {
|
||||
// We didn't drain ourselves here, so schedule a future wake so we
|
||||
// continue to drain remaining metrics.
|
||||
let _ = waker.wake();
|
||||
state.wake();
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -329,6 +380,7 @@ fn run_transport(
|
|||
let done = drive_connection(conn, wbuf, msgs);
|
||||
if done {
|
||||
clients_to_remove.push(*token);
|
||||
state.decrement_clients();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -353,6 +405,7 @@ fn run_transport(
|
|||
let done = drive_connection(conn, wbuf, msgs);
|
||||
if done {
|
||||
clients_to_remove.push(*token);
|
||||
state.decrement_clients();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -365,6 +418,7 @@ fn run_transport(
|
|||
if let Some((conn, _, _)) = clients.get_mut(&token) {
|
||||
trace!(?conn, ?token, "removing client");
|
||||
clients.remove(&token);
|
||||
state.decrement_clients();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -379,6 +433,8 @@ fn run_transport(
|
|||
.register(&mut conn, token, CLIENT_INTEREST)
|
||||
.expect("failed to register interest for client connection");
|
||||
|
||||
state.increment_clients();
|
||||
|
||||
// Start tracking them, and enqueue all of the metadata.
|
||||
let metadata = generate_metadata_messages(&metadata);
|
||||
clients
|
||||
|
@ -401,6 +457,7 @@ fn run_transport(
|
|||
if done {
|
||||
trace!(?conn, ?token, "removing client");
|
||||
clients.remove(&token);
|
||||
state.decrement_clients();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -448,7 +505,7 @@ fn drive_connection(
|
|||
};
|
||||
|
||||
match conn.write(&buf) {
|
||||
// Zero write = client closedd their connection, so remove 'em.
|
||||
// Zero write = client closed their connection, so remove 'em.
|
||||
Ok(0) => {
|
||||
trace!(?conn, "zero write, closing client");
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue