drop metrics if no clients are connected

This commit is contained in:
Toby Lawrence 2020-10-24 12:36:43 -04:00
parent 6e6761acc3
commit 5763651a41
2 changed files with 70 additions and 13 deletions

View File

@ -25,9 +25,9 @@ fn main() {
Err(e) => eprintln!("read error: {:?}", e),
};
match proto::Metric::decode_length_delimited(&mut buf) {
match proto::Event::decode_length_delimited(&mut buf) {
Err(e) => eprintln!("decode error: {:?}", e),
Ok(msg) => println!("metric: {:?}", msg),
Ok(msg) => println!("event: {:?}", msg),
}
}
}

View File

@ -47,7 +47,10 @@
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::{self, Write};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use std::time::SystemTime;
@ -105,10 +108,55 @@ impl From<SetRecorderError> for Error {
}
}
#[derive(Clone)]
struct State {
client_count: Arc<Mutex<usize>>,
should_send: Arc<AtomicBool>,
waker: Arc<Waker>,
}
impl State {
pub fn from_waker(waker: Waker) -> State {
State {
client_count: Arc::new(Mutex::new(0)),
should_send: Arc::new(AtomicBool::new(false)),
waker: Arc::new(waker),
}
}
pub fn should_send(&self) -> bool {
self.should_send.load(Ordering::Relaxed)
}
pub fn increment_clients(&self) {
// This is slightly overkill _but_ it means we can ensure no wrapping
// addition or subtraction, keeping our "if no clients, don't send" logic
// intact in the face of a logic mistake on our part.
let mut count = self.client_count.lock().unwrap();
*count = count.saturating_add(1);
self.should_send.store(true, Ordering::SeqCst);
}
pub fn decrement_clients(&self) {
// This is slightly overkill _but_ it means we can ensure no wrapping
// addition or subtraction, keeping our "if no clients, don't send" logic
// intact in the face of a logic mistake on our part.
let mut count = self.client_count.lock().unwrap();
*count = count.saturating_sub(1);
if *count == 0 {
self.should_send.store(false, Ordering::SeqCst);
}
}
pub fn wake(&self) {
let _ = self.waker.wake();
}
}
/// A TCP recorder.
pub struct TcpRecorder {
tx: Sender<Event>,
waker: Arc<Waker>,
state: State,
}
/// Builder for creating and installing a TCP recorder/exporter.
@ -181,18 +229,19 @@ impl TcpBuilder {
};
let poll = Poll::new()?;
let waker = Arc::new(Waker::new(poll.registry(), WAKER)?);
let waker = Waker::new(poll.registry(), WAKER)?;
let mut listener = TcpListener::bind(self.listen_addr)?;
poll.registry()
.register(&mut listener, LISTENER, Interest::READABLE)?;
let state = State::from_waker(waker);
let recorder = TcpRecorder {
tx,
waker: Arc::clone(&waker),
state: state.clone(),
};
thread::spawn(move || run_transport(poll, waker, listener, rx, buffer_size));
thread::spawn(move || run_transport(poll, listener, rx, state, buffer_size));
Ok(recorder)
}
}
@ -208,12 +257,14 @@ impl TcpRecorder {
let _ = self
.tx
.try_send(Event::Metadata(key, metric_type, unit, description));
let _ = self.waker.wake();
self.state.wake();
}
fn push_metric(&self, key: Key, value: MetricValue) {
let _ = self.tx.try_send(Event::Metric(key, value));
let _ = self.waker.wake();
if self.state.should_send() {
let _ = self.tx.try_send(Event::Metric(key, value));
self.state.wake();
}
}
}
@ -245,9 +296,9 @@ impl Recorder for TcpRecorder {
fn run_transport(
mut poll: Poll,
waker: Arc<Waker>,
listener: TcpListener,
rx: Receiver<Event>,
state: State,
buffer_size: Option<usize>,
) {
let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
@ -284,7 +335,7 @@ fn run_transport(
if buffered_pmsgs.len() >= buffer_limit {
// We didn't drain ourselves here, so schedule a future wake so we
// continue to drain remaining metrics.
let _ = waker.wake();
state.wake();
break;
}
@ -329,6 +380,7 @@ fn run_transport(
let done = drive_connection(conn, wbuf, msgs);
if done {
clients_to_remove.push(*token);
state.decrement_clients();
continue;
}
@ -353,6 +405,7 @@ fn run_transport(
let done = drive_connection(conn, wbuf, msgs);
if done {
clients_to_remove.push(*token);
state.decrement_clients();
}
}
@ -365,6 +418,7 @@ fn run_transport(
if let Some((conn, _, _)) = clients.get_mut(&token) {
trace!(?conn, ?token, "removing client");
clients.remove(&token);
state.decrement_clients();
}
}
}
@ -379,6 +433,8 @@ fn run_transport(
.register(&mut conn, token, CLIENT_INTEREST)
.expect("failed to register interest for client connection");
state.increment_clients();
// Start tracking them, and enqueue all of the metadata.
let metadata = generate_metadata_messages(&metadata);
clients
@ -401,6 +457,7 @@ fn run_transport(
if done {
trace!(?conn, ?token, "removing client");
clients.remove(&token);
state.decrement_clients();
}
}
}
@ -448,7 +505,7 @@ fn drive_connection(
};
match conn.write(&buf) {
// Zero write = client closedd their connection, so remove 'em.
// Zero write = client closed their connection, so remove 'em.
Ok(0) => {
trace!(?conn, "zero write, closing client");
return true;