diff --git a/Cargo.toml b/Cargo.toml index 3dc88b791..a8034c572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,3 +80,4 @@ p2p = "0.5.2" futures = "0.1.21" clap = "2.31" reqwest = "0.8.6" +influx_db_client = "0.3.4" diff --git a/src/drone.rs b/src/drone.rs index 2aab65dc6..88ae672d0 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -4,6 +4,8 @@ //! checking requests against a request cap for a given time time_slice //! and (to come) an IP rate limit. +use influx_db_client as influxdb; +use metrics; use signature::{KeyPair, PublicKey}; use std::io; use std::io::{Error, ErrorKind}; @@ -121,6 +123,19 @@ impl Drone { } if self.check_request_limit(request_amount) { self.request_current += request_amount; + metrics::submit( + influxdb::Point::new("drone") + .add_tag("op", influxdb::Value::String("airdrop".to_string())) + .add_field( + "request_amount", + influxdb::Value::Integer(request_amount as i64), + ) + .add_field( + "request_current", + influxdb::Value::Integer(self.request_current as i64), + ) + .to_owned(), + ); client.transfer_signed(tx) } else { Err(Error::new(ErrorKind::Other, "token limit reached")) @@ -128,6 +143,12 @@ impl Drone { } } +impl Drop for Drone { + fn drop(&mut self) { + metrics::flush(); + } +} + #[cfg(test)] mod tests { use bank::Bank; diff --git a/src/lib.rs b/src/lib.rs index d91604f0b..68ef9e686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ pub mod fullnode; pub mod hash; pub mod ledger; pub mod logger; +pub mod metrics; pub mod mint; pub mod nat; pub mod ncp; @@ -72,4 +73,5 @@ extern crate untrusted; #[macro_use] extern crate matches; +extern crate influx_db_client; extern crate rand; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 000000000..e172fe590 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,303 @@ +//! The `metrics` module enables sending measurements to an InfluxDB instance + +use influx_db_client as influxdb; +use std::env; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; +use std::sync::{Arc, Barrier, Mutex, Once, ONCE_INIT}; +use std::thread; +use std::time::{Duration, Instant}; +use timing; + +#[derive(Debug)] +enum MetricsCommand { + Submit(influxdb::Point), + Flush(Arc), +} + +struct MetricsAgent { + sender: Sender, +} + +trait MetricsWriter { + // Write the points and empty the vector. Called on the internal + // MetricsAgent worker thread. + fn write(&self, points: Vec); +} + +struct InfluxDbMetricsWriter { + client: Option, +} + +impl InfluxDbMetricsWriter { + fn new() -> Self { + InfluxDbMetricsWriter { + client: Self::build_client(), + } + } + + fn build_client() -> Option { + let host = env::var("INFLUX_HOST").unwrap_or("https://metrics.solana.com:8086".to_string()); + let db = env::var("INFLUX_DATABASE").unwrap_or("scratch".to_string()); + let username = env::var("INFLUX_USERNAME").unwrap_or("scratch_writer".to_string()); + let password = env::var("INFLUX_PASSWORD").unwrap_or("topsecret".to_string()); + + debug!("InfluxDB host={} db={} username={}", host, db, username); + let mut client = influxdb::Client::new_with_option(host, db, None) + .set_authentication(username, password); + + client.set_read_timeout(1 /*second*/); + client.set_write_timeout(1 /*second*/); + + debug!("InfluxDB version: {:?}", client.get_version()); + Some(client) + } +} + +impl MetricsWriter for InfluxDbMetricsWriter { + fn write(&self, points: Vec) { + if let Some(ref client) = self.client { + if let Err(err) = client.write_points( + influxdb::Points { point: points }, + Some(influxdb::Precision::Milliseconds), + None, + ) { + debug!("InfluxDbMetricsWriter write error: {:?}", err); + } + } + } +} + +impl Default for MetricsAgent { + fn default() -> Self { + Self::new( + Arc::new(InfluxDbMetricsWriter::new()), + Duration::from_secs(10), + ) + } +} + +impl MetricsAgent { + fn new(writer: Arc, write_frequency: Duration) -> Self { + let (sender, receiver) = channel::(); + thread::spawn(move || Self::run(receiver, writer, write_frequency)); + MetricsAgent { sender } + } + + fn run( + receiver: Receiver, + writer: Arc, + write_frequency: Duration, + ) { + trace!("run: enter"); + let mut last_write_time = Instant::now(); + let mut points = Vec::new(); + + loop { + match receiver.recv_timeout(write_frequency / 2) { + Ok(cmd) => match cmd { + MetricsCommand::Flush(barrier) => { + debug!("metrics_thread: flush"); + if !points.is_empty() { + writer.write(points); + points = Vec::new(); + last_write_time = Instant::now(); + } + barrier.wait(); + } + MetricsCommand::Submit(point) => { + debug!("run: submit {:?}", point); + points.push(point); + } + }, + Err(RecvTimeoutError::Timeout) => { + trace!("run: receive timeout"); + } + Err(RecvTimeoutError::Disconnected) => { + debug!("run: sender disconnected"); + break; + } + } + + let now = Instant::now(); + if now.duration_since(last_write_time) >= write_frequency { + if !points.is_empty() { + debug!("run: writing {} points", points.len()); + writer.write(points); + points = Vec::new(); + last_write_time = now; + } + } + } + trace!("run: exit"); + } + + pub fn submit(&self, mut point: influxdb::Point) { + if point.timestamp.is_none() { + point.timestamp = Some(timing::timestamp() as i64); + } + debug!("Submitting point: {:?}", point); + self.sender.send(MetricsCommand::Submit(point)).unwrap(); + } + + pub fn flush(&self) { + debug!("Flush"); + let barrier = Arc::new(Barrier::new(2)); + self.sender + .send(MetricsCommand::Flush(Arc::clone(&barrier))) + .unwrap(); + + barrier.wait(); + } +} + +impl Drop for MetricsAgent { + fn drop(&mut self) { + self.flush(); + } +} + +fn get_singleton_agent() -> Arc> { + static INIT: Once = ONCE_INIT; + static mut AGENT: Option>> = None; + unsafe { + INIT.call_once(|| AGENT = Some(Arc::new(Mutex::new(MetricsAgent::default())))); + match AGENT { + Some(ref agent) => agent.clone(), + None => panic!("Failed to initialize metrics agent"), + } + } +} + +/// Submits a new point from any thread. Note that points are internally queued +/// and transmitted periodically in batches. +pub fn submit(point: influxdb::Point) { + let agent_mutex = get_singleton_agent(); + let agent = agent_mutex.lock().unwrap(); + agent.submit(point); +} + +/// Blocks until all pending points from previous calls to `submit` have been +/// transmitted. +pub fn flush() { + let agent_mutex = get_singleton_agent(); + let agent = agent_mutex.lock().unwrap(); + agent.flush(); +} + +#[cfg(test)] +mod test { + use super::*; + use rand::random; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct MockMetricsWriter { + points_written: AtomicUsize, + } + impl MockMetricsWriter { + fn new() -> Self { + MockMetricsWriter { + points_written: AtomicUsize::new(0), + } + } + + fn points_written(&self) -> usize { + return self.points_written.load(Ordering::SeqCst); + } + } + + impl MetricsWriter for MockMetricsWriter { + fn write(&self, points: Vec) { + assert!(!points.is_empty()); + + self.points_written + .fetch_add(points.len(), Ordering::SeqCst); + + println!( + "Writing {} points ({} total)", + points.len(), + self.points_written.load(Ordering::SeqCst) + ); + } + } + + #[test] + fn test_submit() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10)); + + for i in 0..42 { + agent.submit(influxdb::Point::new(&format!("measurement {}", i))); + } + + agent.flush(); + assert_eq!(writer.points_written(), 42); + } + + #[test] + fn test_submit_with_delay() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = MetricsAgent::new(writer.clone(), Duration::from_millis(100)); + + agent.submit(influxdb::Point::new("point 1")); + thread::sleep(Duration::from_secs(2)); + assert_eq!(writer.points_written(), 1); + } + + #[test] + fn test_multithread_submit() { + let writer = Arc::new(MockMetricsWriter::new()); + let agent = Arc::new(Mutex::new(MetricsAgent::new( + writer.clone(), + Duration::from_secs(10), + ))); + + // + // Submit measurements from different threads + // + let mut threads = Vec::new(); + for i in 0..42 { + let point = influxdb::Point::new(&format!("measurement {}", i)); + let agent = Arc::clone(&agent); + threads.push(thread::spawn(move || { + agent.lock().unwrap().submit(point); + })); + } + + for thread in threads { + thread.join().unwrap(); + } + + agent.lock().unwrap().flush(); + assert_eq!(writer.points_written(), 42); + } + + #[test] + fn test_flush_before_drop() { + let writer = Arc::new(MockMetricsWriter::new()); + { + let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999)); + agent.submit(influxdb::Point::new("point 1")); + } + + assert_eq!(writer.points_written(), 1); + } + + #[test] + fn test_live_submit() { + let agent = MetricsAgent::default(); + + let point = influxdb::Point::new("live_submit_test") + .add_tag("test", influxdb::Value::Boolean(true)) + .add_field( + "random_bool", + influxdb::Value::Boolean(random::() < 128), + ) + .add_field( + "random_int", + influxdb::Value::Integer(random::() as i64), + ) + .to_owned(); + agent.submit(point); + } + +} diff --git a/src/thin_client.rs b/src/thin_client.rs index 164e519e8..ff0d4a1bc 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -10,8 +10,13 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; +use std::time::Instant; +use timing; use transaction::Transaction; +use influx_db_client as influxdb; +use metrics; + /// An object for querying and sending transactions to the network. pub struct ThinClient { requests_addr: SocketAddr, @@ -100,9 +105,20 @@ impl ThinClient { to: PublicKey, last_id: &Hash, ) -> io::Result { + let now = Instant::now(); let tx = Transaction::new(keypair, to, n, *last_id); let sig = tx.sig; - self.transfer_signed(tx).map(|_| sig) + let result = self.transfer_signed(tx).map(|_| sig); + metrics::submit( + influxdb::Point::new("thinclient") + .add_tag("op", influxdb::Value::String("transfer".to_string())) + .add_field( + "duration_ms", + influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), + ) + .to_owned(), + ); + result } /// Request the balance of the user holding `pubkey`. This method blocks @@ -183,8 +199,6 @@ impl ThinClient { } pub fn poll_get_balance(&mut self, pubkey: &PublicKey) -> io::Result { - use std::time::Instant; - let mut balance; let now = Instant::now(); loop { @@ -193,7 +207,15 @@ impl ThinClient { break; } } - + metrics::submit( + influxdb::Point::new("thinclient") + .add_tag("op", influxdb::Value::String("get_balance".to_string())) + .add_field( + "duration_ms", + influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), + ) + .to_owned(), + ); balance } @@ -203,6 +225,7 @@ impl ThinClient { trace!("check_signature"); let req = Request::GetSignature { signature: *sig }; let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature"); + let now = Instant::now(); let mut done = false; while !done { self.requests_socket @@ -216,10 +239,25 @@ impl ThinClient { self.process_response(resp); } } + metrics::submit( + influxdb::Point::new("thinclient") + .add_tag("op", influxdb::Value::String("check_signature".to_string())) + .add_field( + "duration_ms", + influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64), + ) + .to_owned(), + ); self.signature_status } } +impl Drop for ThinClient { + fn drop(&mut self) { + metrics::flush(); + } +} + #[cfg(test)] mod tests { use super::*;