Metrics v0.1

This commit is contained in:
Michael Vines 2018-07-05 21:40:09 -07:00
parent 3ed9567f96
commit 22c0e3cd54
5 changed files with 369 additions and 4 deletions

View File

@ -80,3 +80,4 @@ p2p = "0.5.2"
futures = "0.1.21"
clap = "2.31"
reqwest = "0.8.6"
influx_db_client = "0.3.4"

View File

@ -4,6 +4,8 @@
//! checking requests against a request cap for a given time time_slice
//! and (to come) an IP rate limit.
use influx_db_client as influxdb;
use metrics;
use signature::{KeyPair, PublicKey};
use std::io;
use std::io::{Error, ErrorKind};
@ -121,6 +123,19 @@ impl Drone {
}
if self.check_request_limit(request_amount) {
self.request_current += request_amount;
metrics::submit(
influxdb::Point::new("drone")
.add_tag("op", influxdb::Value::String("airdrop".to_string()))
.add_field(
"request_amount",
influxdb::Value::Integer(request_amount as i64),
)
.add_field(
"request_current",
influxdb::Value::Integer(self.request_current as i64),
)
.to_owned(),
);
client.transfer_signed(tx)
} else {
Err(Error::new(ErrorKind::Other, "token limit reached"))
@ -128,6 +143,12 @@ impl Drone {
}
}
impl Drop for Drone {
fn drop(&mut self) {
metrics::flush();
}
}
#[cfg(test)]
mod tests {
use bank::Bank;

View File

@ -25,6 +25,7 @@ pub mod fullnode;
pub mod hash;
pub mod ledger;
pub mod logger;
pub mod metrics;
pub mod mint;
pub mod nat;
pub mod ncp;
@ -72,4 +73,5 @@ extern crate untrusted;
#[macro_use]
extern crate matches;
extern crate influx_db_client;
extern crate rand;

303
src/metrics.rs Normal file
View File

@ -0,0 +1,303 @@
//! The `metrics` module enables sending measurements to an InfluxDB instance
use influx_db_client as influxdb;
use std::env;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Barrier, Mutex, Once, ONCE_INIT};
use std::thread;
use std::time::{Duration, Instant};
use timing;
#[derive(Debug)]
enum MetricsCommand {
Submit(influxdb::Point),
Flush(Arc<Barrier>),
}
struct MetricsAgent {
sender: Sender<MetricsCommand>,
}
trait MetricsWriter {
// Write the points and empty the vector. Called on the internal
// MetricsAgent worker thread.
fn write(&self, points: Vec<influxdb::Point>);
}
struct InfluxDbMetricsWriter {
client: Option<influxdb::Client>,
}
impl InfluxDbMetricsWriter {
fn new() -> Self {
InfluxDbMetricsWriter {
client: Self::build_client(),
}
}
fn build_client() -> Option<influxdb::Client> {
let host = env::var("INFLUX_HOST").unwrap_or("https://metrics.solana.com:8086".to_string());
let db = env::var("INFLUX_DATABASE").unwrap_or("scratch".to_string());
let username = env::var("INFLUX_USERNAME").unwrap_or("scratch_writer".to_string());
let password = env::var("INFLUX_PASSWORD").unwrap_or("topsecret".to_string());
debug!("InfluxDB host={} db={} username={}", host, db, username);
let mut client = influxdb::Client::new_with_option(host, db, None)
.set_authentication(username, password);
client.set_read_timeout(1 /*second*/);
client.set_write_timeout(1 /*second*/);
debug!("InfluxDB version: {:?}", client.get_version());
Some(client)
}
}
impl MetricsWriter for InfluxDbMetricsWriter {
fn write(&self, points: Vec<influxdb::Point>) {
if let Some(ref client) = self.client {
if let Err(err) = client.write_points(
influxdb::Points { point: points },
Some(influxdb::Precision::Milliseconds),
None,
) {
debug!("InfluxDbMetricsWriter write error: {:?}", err);
}
}
}
}
impl Default for MetricsAgent {
fn default() -> Self {
Self::new(
Arc::new(InfluxDbMetricsWriter::new()),
Duration::from_secs(10),
)
}
}
impl MetricsAgent {
fn new(writer: Arc<MetricsWriter + Send + Sync>, write_frequency: Duration) -> Self {
let (sender, receiver) = channel::<MetricsCommand>();
thread::spawn(move || Self::run(receiver, writer, write_frequency));
MetricsAgent { sender }
}
fn run(
receiver: Receiver<MetricsCommand>,
writer: Arc<MetricsWriter>,
write_frequency: Duration,
) {
trace!("run: enter");
let mut last_write_time = Instant::now();
let mut points = Vec::new();
loop {
match receiver.recv_timeout(write_frequency / 2) {
Ok(cmd) => match cmd {
MetricsCommand::Flush(barrier) => {
debug!("metrics_thread: flush");
if !points.is_empty() {
writer.write(points);
points = Vec::new();
last_write_time = Instant::now();
}
barrier.wait();
}
MetricsCommand::Submit(point) => {
debug!("run: submit {:?}", point);
points.push(point);
}
},
Err(RecvTimeoutError::Timeout) => {
trace!("run: receive timeout");
}
Err(RecvTimeoutError::Disconnected) => {
debug!("run: sender disconnected");
break;
}
}
let now = Instant::now();
if now.duration_since(last_write_time) >= write_frequency {
if !points.is_empty() {
debug!("run: writing {} points", points.len());
writer.write(points);
points = Vec::new();
last_write_time = now;
}
}
}
trace!("run: exit");
}
pub fn submit(&self, mut point: influxdb::Point) {
if point.timestamp.is_none() {
point.timestamp = Some(timing::timestamp() as i64);
}
debug!("Submitting point: {:?}", point);
self.sender.send(MetricsCommand::Submit(point)).unwrap();
}
pub fn flush(&self) {
debug!("Flush");
let barrier = Arc::new(Barrier::new(2));
self.sender
.send(MetricsCommand::Flush(Arc::clone(&barrier)))
.unwrap();
barrier.wait();
}
}
impl Drop for MetricsAgent {
fn drop(&mut self) {
self.flush();
}
}
fn get_singleton_agent() -> Arc<Mutex<MetricsAgent>> {
static INIT: Once = ONCE_INIT;
static mut AGENT: Option<Arc<Mutex<MetricsAgent>>> = None;
unsafe {
INIT.call_once(|| AGENT = Some(Arc::new(Mutex::new(MetricsAgent::default()))));
match AGENT {
Some(ref agent) => agent.clone(),
None => panic!("Failed to initialize metrics agent"),
}
}
}
/// Submits a new point from any thread. Note that points are internally queued
/// and transmitted periodically in batches.
pub fn submit(point: influxdb::Point) {
let agent_mutex = get_singleton_agent();
let agent = agent_mutex.lock().unwrap();
agent.submit(point);
}
/// Blocks until all pending points from previous calls to `submit` have been
/// transmitted.
pub fn flush() {
let agent_mutex = get_singleton_agent();
let agent = agent_mutex.lock().unwrap();
agent.flush();
}
#[cfg(test)]
mod test {
use super::*;
use rand::random;
use std::sync::atomic::{AtomicUsize, Ordering};
struct MockMetricsWriter {
points_written: AtomicUsize,
}
impl MockMetricsWriter {
fn new() -> Self {
MockMetricsWriter {
points_written: AtomicUsize::new(0),
}
}
fn points_written(&self) -> usize {
return self.points_written.load(Ordering::SeqCst);
}
}
impl MetricsWriter for MockMetricsWriter {
fn write(&self, points: Vec<influxdb::Point>) {
assert!(!points.is_empty());
self.points_written
.fetch_add(points.len(), Ordering::SeqCst);
println!(
"Writing {} points ({} total)",
points.len(),
self.points_written.load(Ordering::SeqCst)
);
}
}
#[test]
fn test_submit() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10));
for i in 0..42 {
agent.submit(influxdb::Point::new(&format!("measurement {}", i)));
}
agent.flush();
assert_eq!(writer.points_written(), 42);
}
#[test]
fn test_submit_with_delay() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = MetricsAgent::new(writer.clone(), Duration::from_millis(100));
agent.submit(influxdb::Point::new("point 1"));
thread::sleep(Duration::from_secs(2));
assert_eq!(writer.points_written(), 1);
}
#[test]
fn test_multithread_submit() {
let writer = Arc::new(MockMetricsWriter::new());
let agent = Arc::new(Mutex::new(MetricsAgent::new(
writer.clone(),
Duration::from_secs(10),
)));
//
// Submit measurements from different threads
//
let mut threads = Vec::new();
for i in 0..42 {
let point = influxdb::Point::new(&format!("measurement {}", i));
let agent = Arc::clone(&agent);
threads.push(thread::spawn(move || {
agent.lock().unwrap().submit(point);
}));
}
for thread in threads {
thread.join().unwrap();
}
agent.lock().unwrap().flush();
assert_eq!(writer.points_written(), 42);
}
#[test]
fn test_flush_before_drop() {
let writer = Arc::new(MockMetricsWriter::new());
{
let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9999999));
agent.submit(influxdb::Point::new("point 1"));
}
assert_eq!(writer.points_written(), 1);
}
#[test]
fn test_live_submit() {
let agent = MetricsAgent::default();
let point = influxdb::Point::new("live_submit_test")
.add_tag("test", influxdb::Value::Boolean(true))
.add_field(
"random_bool",
influxdb::Value::Boolean(random::<u8>() < 128),
)
.add_field(
"random_int",
influxdb::Value::Integer(random::<u8>() as i64),
)
.to_owned();
agent.submit(point);
}
}

View File

@ -10,8 +10,13 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::{SocketAddr, UdpSocket};
use std::time::Instant;
use timing;
use transaction::Transaction;
use influx_db_client as influxdb;
use metrics;
/// An object for querying and sending transactions to the network.
pub struct ThinClient {
requests_addr: SocketAddr,
@ -100,9 +105,20 @@ impl ThinClient {
to: PublicKey,
last_id: &Hash,
) -> io::Result<Signature> {
let now = Instant::now();
let tx = Transaction::new(keypair, to, n, *last_id);
let sig = tx.sig;
self.transfer_signed(tx).map(|_| sig)
let result = self.transfer_signed(tx).map(|_| sig);
metrics::submit(
influxdb::Point::new("thinclient")
.add_tag("op", influxdb::Value::String("transfer".to_string()))
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
);
result
}
/// Request the balance of the user holding `pubkey`. This method blocks
@ -183,8 +199,6 @@ impl ThinClient {
}
pub fn poll_get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
use std::time::Instant;
let mut balance;
let now = Instant::now();
loop {
@ -193,7 +207,15 @@ impl ThinClient {
break;
}
}
metrics::submit(
influxdb::Point::new("thinclient")
.add_tag("op", influxdb::Value::String("get_balance".to_string()))
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
);
balance
}
@ -203,6 +225,7 @@ impl ThinClient {
trace!("check_signature");
let req = Request::GetSignature { signature: *sig };
let data = serialize(&req).expect("serialize GetSignature in pub fn check_signature");
let now = Instant::now();
let mut done = false;
while !done {
self.requests_socket
@ -216,10 +239,25 @@ impl ThinClient {
self.process_response(resp);
}
}
metrics::submit(
influxdb::Point::new("thinclient")
.add_tag("op", influxdb::Value::String("check_signature".to_string()))
.add_field(
"duration_ms",
influxdb::Value::Integer(timing::duration_as_ms(&now.elapsed()) as i64),
)
.to_owned(),
);
self.signature_status
}
}
impl Drop for ThinClient {
fn drop(&mut self) {
metrics::flush();
}
}
#[cfg(test)]
mod tests {
use super::*;