cleanup of tcp exporter

This commit is contained in:
Toby Lawrence 2020-06-01 16:23:11 -04:00
parent 175ee28167
commit 71ca17754f
4 changed files with 54 additions and 15 deletions

View File

@ -23,3 +23,5 @@ built = "^0.3"
[dev-dependencies]
quanta = "^0.5"
tracing = "^0.1"
tracing-subscriber = "^0.2"

View File

@ -17,9 +17,13 @@ fn main() {
loop {
match stream.read(&mut rbuf[..]) {
Ok(0) => {
println!("server disconnected, closing");
break
}
Ok(n) => buf.put_slice(&rbuf[..n]),
Err(e) => eprintln!("read error: {:?}", e),
}
};
match proto::Metric::decode_length_delimited(&mut buf) {
Err(e) => eprintln!("decode error: {:?}", e),

View File

@ -1,12 +1,14 @@
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use metrics::{histogram, increment};
use metrics_tcp::TcpBuilder;
use metrics_exporter_tcp::TcpBuilder;
use quanta::Clock;
fn main() {
tracing_subscriber::fmt::init();
let builder = TcpBuilder::new();
builder.install().expect("failed to install TCP recorder");

View File

@ -49,6 +49,7 @@ impl CompositeKey {
}
}
// Errors that could occur while install a TCP recorder/exporter.
#[derive(Debug)]
pub enum Error {
Io(io::Error),
@ -70,35 +71,60 @@ impl From<SetRecorderError> for Error {
struct TcpRecorder {
registry: Arc<TcpRegistry>,
tx: Sender<(Identifier, MetricValue)>,
waker: Waker,
waker: Arc<Waker>,
}
/// Builder for creating and installing a TCP recorder/exporter.
pub struct TcpBuilder {
addr: SocketAddr,
listen_addr: SocketAddr,
buffer_size: Option<usize>,
}
impl TcpBuilder {
/// Creates a new `TcpBuilder`.
pub fn new() -> TcpBuilder {
TcpBuilder {
addr: ([127, 0, 0, 1], 5000).into(),
listen_addr: ([127, 0, 0, 1], 5000).into(),
buffer_size: Some(1024),
}
}
pub fn bind_address<A>(mut self, addr: A) -> TcpBuilder
/// Sets the listen address.
///
/// The exporter will accept connections on this address and immediately begin forwarding
/// metrics to the client.
///
/// Defaults to `127.0.0.1:5000`.
pub fn listen_address<A>(mut self, addr: A) -> TcpBuilder
where
A: Into<SocketAddr>,
{
self.addr = addr.into();
self.listen_addr = addr.into();
self
}
/// Sets the buffer size for internal operations.
///
/// The buffer size controls two operational aspects: the number of metrics processed
/// per iteration of the event loop, and the number of buffered metrics each client
/// can hold.
///
/// This setting allows trading off responsiveness for throughput, where a smaller buffer
/// size will ensure that metrics are pushed to clients sooner, versus a larger buffer
/// size that allows us to push more at a time.alloc
///
/// As well, the larger the buffer, the more messages a client can temporarily hold.
/// Clients have a circular buffer implementation so if their buffers are full, metrics
/// will be dropped as necessary to avoid backpressure in the recorder.
pub fn buffer_size(mut self, size: Option<usize>) -> TcpBuilder {
self.buffer_size = size;
self
}
/// Installs the recorder and exporter.
///
/// An error will be returned if there's an issue with creating the TCP server or with
/// installing the recorder as the global recorder.
pub fn install(self) -> Result<(), Error> {
let buffer_size = self.buffer_size;
let (tx, rx) = match buffer_size {
@ -107,9 +133,9 @@ impl TcpBuilder {
};
let poll = Poll::new()?;
let waker = Waker::new(poll.registry(), WAKER)?;
let waker = Arc::new(Waker::new(poll.registry(), WAKER)?);
let mut listener = TcpListener::bind(self.addr)?;
let mut listener = TcpListener::bind(self.listen_addr)?;
poll.registry()
.register(&mut listener, LISTENER, Interest::READABLE)?;
@ -118,11 +144,11 @@ impl TcpBuilder {
let recorder = TcpRecorder {
registry: Arc::clone(&registry),
tx,
waker,
waker: Arc::clone(&waker),
};
metrics::set_boxed_recorder(Box::new(recorder))?;
thread::spawn(move || run_transport(registry, poll, listener, rx, buffer_size));
thread::spawn(move || run_transport(registry, poll, waker, listener, rx, buffer_size));
Ok(())
}
}
@ -168,6 +194,7 @@ impl Recorder for TcpRecorder {
fn run_transport(
registry: Arc<TcpRegistry>,
mut poll: Poll,
waker: Arc<Waker>,
listener: TcpListener,
rx: Receiver<(Identifier, MetricValue)>,
buffer_size: Option<usize>,
@ -191,8 +218,9 @@ fn run_transport(
}
drop(_evspan);
// undo when mio can show poll event count
// trace!("return from poll, events = events.len());
// Technically, this is an abuse of size_hint() but Mio will return the number of events
// for both parts of the tuple.
trace!(events = events.iter().size_hint().0, "return from poll");
let _pspan = trace_span!("process events");
for event in events.iter() {
@ -202,6 +230,9 @@ fn run_transport(
let _mrxspan = trace_span!("metrics in");
loop {
if buffered_pmsgs.len() >= buffer_limit {
// We didn't drain ourselves here, so schedule a future wake so we
// continue to drain remaining metrics.
let _ = waker.wake();
break;
}
@ -314,7 +345,7 @@ fn run_transport(
}
}
#[tracing::instrument]
#[tracing::instrument(skip(wbuf, msgs))]
fn drive_connection(
conn: &mut TcpStream,
wbuf: &mut Option<Bytes>,