diff --git a/src/bank.rs b/src/bank.rs index 54033e90c8..2d9753d52e 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -6,7 +6,7 @@ extern crate libc; use chrono::prelude::*; -use counter::{self, Counter}; +use counter::Counter; use entry::Entry; use hash::Hash; use itertools::Itertools; @@ -204,17 +204,9 @@ impl Bank { let option = bals.get_mut(&tx.from); if option.is_none() { if let Instruction::NewVote(_) = &tx.instruction { - static mut COUNTER_VOTE_ACCOUNT_NOT_FOUND: Counter = create_counter!( - "bank-appy_debits-vote_account_not_found", - counter::DEFAULT_LOG_RATE - ); - inc_counter!(COUNTER_VOTE_ACCOUNT_NOT_FOUND, 1); + inc_new_counter!("bank-appy_debits-vote_account_not_found", 1); } else { - static mut COUNTER_ACCOUNT_NOT_FOUND: Counter = create_counter!( - "bank-appy_debits-generic_account_not_found", - counter::DEFAULT_LOG_RATE - ); - inc_counter!(COUNTER_ACCOUNT_NOT_FOUND, 1); + inc_new_counter!("bank-appy_debits-generic_account_not_found", 1); } return Err(BankError::AccountNotFound(tx.from)); } diff --git a/src/banking_stage.rs b/src/banking_stage.rs index c8b7f6de35..ebd270af78 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -89,7 +89,6 @@ impl BankingStage { mms.len(), ); let count = mms.iter().map(|x| x.1.len()).sum(); - static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1); let proc_start = Instant::now(); for (msgs, vers) in mms { let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); @@ -125,7 +124,7 @@ impl BankingStage { reqs_len, (reqs_len as f32) / (total_time_s) ); - inc_counter!(COUNTER, count); + inc_new_counter!("banking_stage-process_packets", count); Ok(()) } } diff --git a/src/counter.rs b/src/counter.rs index ff77388b4e..afdac25a3a 100644 --- a/src/counter.rs +++ b/src/counter.rs @@ -4,7 +4,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use timing; const INFLUX_RATE: usize = 100; -pub const DEFAULT_LOG_RATE: usize = 10; pub struct Counter { pub name: &'static str, @@ -34,6 +33,17 @@ macro_rules! inc_counter { }; } +macro_rules! inc_new_counter { + ($name:expr, $count:expr) => {{ + static mut INC_NEW_COUNTER: Counter = create_counter!($name, 10); + inc_counter!(INC_NEW_COUNTER, $count); + }}; + ($name:expr, $count:expr, $lograte:expr) => {{ + static mut INC_NEW_COUNTER: Counter = create_counter!($name, $lograte); + inc_counter!(INC_NEW_COUNTER, $count); + }}; +} + impl Counter { pub fn inc(&mut self, events: usize) { let counts = self.counts.fetch_add(events, Ordering::Relaxed); @@ -89,4 +99,11 @@ mod tests { assert_eq!(COUNTER.lastlog.load(Ordering::Relaxed), 399); } } + #[test] + fn test_inc_new_counter() { + //make sure that macros are syntactically correct + //the variable is internal to the macro scope so there is no way to introspect it + inc_new_counter!("counter-1", 1); + inc_new_counter!("counter-2", 1, 2); + } } diff --git a/src/crdt.rs b/src/crdt.rs index d8d42b4e80..aef273b2d9 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -37,8 +37,6 @@ use streamer::{BlobReceiver, BlobSender, Window}; use timing::timestamp; use transaction::Vote; -const LOG_RATE: usize = 10; - /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_PURGE_MILLIS: u64 = 15000; @@ -348,8 +346,7 @@ impl Crdt { } } pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) { - static mut COUNTER_VOTE: Counter = create_counter!("crdt-vote-count", LOG_RATE); - inc_counter!(COUNTER_VOTE, votes.len()); + inc_new_counter!("crdt-vote-count", votes.len()); if !votes.is_empty() { info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); } @@ -373,8 +370,7 @@ impl Crdt { self.update_index += 1; let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); - static mut COUNTER_UPDATE: Counter = create_counter!("crdt-update-count", LOG_RATE); - inc_counter!(COUNTER_UPDATE, 1); + inc_new_counter!("crdt-update-count", 1); } else { trace!( "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", @@ -446,8 +442,7 @@ impl Crdt { }) .collect(); - static mut COUNTER_PURGE: Counter = create_counter!("crdt-purge-count", LOG_RATE); - inc_counter!(COUNTER_PURGE, dead_ids.len()); + inc_new_counter!("crdt-purge-count", dead_ids.len()); for id in &dead_ids { self.alive.remove(id); @@ -898,24 +893,18 @@ impl Crdt { outblob.meta.set_addr(&from.contact_info.tvu_window); outblob.set_id(sender_id).expect("blob set_id"); } - static mut COUNTER_REQ_WINDOW_PASS: Counter = - create_counter!("crdt-window-request-pass", LOG_RATE); - inc_counter!(COUNTER_REQ_WINDOW_PASS, 1); + inc_new_counter!("crdt-window-request-pass", 1); return Some(out); } else { - static mut COUNTER_REQ_WINDOW_OUTSIDE: Counter = - create_counter!("crdt-window-request-outside", LOG_RATE); - inc_counter!(COUNTER_REQ_WINDOW_OUTSIDE, 1); + inc_new_counter!("crdt-window-request-outside", 1); info!( "requested ix {} != blob_ix {}, outside window!", ix, blob_ix ); } } else { - static mut COUNTER_REQ_WINDOW_FAIL: Counter = - create_counter!("crdt-window-request-fail", LOG_RATE); - inc_counter!(COUNTER_REQ_WINDOW_FAIL, 1); + inc_new_counter!("crdt-window-request-fail", 1); assert!(window.read().unwrap()[pos].is_none()); info!( "{:x}: failed RequestWindowIndex {:x} {} {}", @@ -991,9 +980,7 @@ impl Crdt { //TODO verify from is signed obj.write().unwrap().insert(&from); let me = obj.read().unwrap().my_data().clone(); - static mut COUNTER_REQ_WINDOW: Counter = - create_counter!("crdt-window-request-recv", LOG_RATE); - inc_counter!(COUNTER_REQ_WINDOW, 1); + inc_new_counter!("crdt-window-request-recv", 1); trace!( "{:x}:received RequestWindowIndex {:x} {} ", me.debug_id(), diff --git a/src/packet.rs b/src/packet.rs index 33c6ffc3a7..50c67c069b 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -19,7 +19,6 @@ pub type SharedBlobs = VecDeque; pub type PacketRecycler = Recycler; pub type BlobRecycler = Recycler; -const LOG_RATE: usize = 10; pub const NUM_PACKETS: usize = 1024 * 8; pub const BLOB_SIZE: usize = 64 * 1024; pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE; @@ -188,7 +187,6 @@ impl Recycler { impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { - static mut COUNTER: Counter = create_counter!("packets", LOG_RATE); self.packets.resize(NUM_PACKETS, Packet::default()); let mut i = 0; //DOCUMENTED SIDE-EFFECT @@ -203,7 +201,7 @@ impl Packets { trace!("receiving on {}", socket.local_addr().unwrap()); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { - inc_counter!(COUNTER, i); + inc_new_counter!("packets-recv_count", 1); debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); break; } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 1cb627101d..07c151fab4 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,7 +2,7 @@ use bank::Bank; use bincode::serialize; -use counter::{Counter, DEFAULT_LOG_RATE}; +use counter::Counter; use crdt::Crdt; use ledger; use packet::BlobRecycler; @@ -52,14 +52,10 @@ impl ReplicateStage { let mut wcrdt = crdt.write().unwrap(); wcrdt.insert_votes(&votes); }; - { - static mut COUNTER_REPLICATE: Counter = - create_counter!("replicate-transactions", DEFAULT_LOG_RATE); - inc_counter!( - COUNTER_REPLICATE, - entries.iter().map(|x| x.transactions.len()).sum() - ); - } + inc_new_counter!( + "replicate-transactions", + entries.iter().map(|x| x.transactions.len()).sum() + ); let res = bank.process_entries(entries); if res.is_err() { error!("process_entries {} {:?}", blobs_len, res); @@ -84,11 +80,7 @@ impl ReplicateStage { blob.meta.set_addr(&addr); blob.meta.size = len; } - { - static mut COUNTER_REPLICATE_VOTE: Counter = - create_counter!("replicate-vote_sent", DEFAULT_LOG_RATE); - inc_counter!(COUNTER_REPLICATE_VOTE, 1); - } + inc_new_counter!("replicate-vote_sent", 1); *last_vote = now; vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; diff --git a/src/sigverify.rs b/src/sigverify.rs index 1264626f14..29d58bae63 100644 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -77,7 +77,6 @@ fn batch_size(batches: &[SharedPackets]) -> usize { #[cfg(not(feature = "cuda"))] pub fn ed25519_verify(batches: &Vec) -> Vec> { use rayon::prelude::*; - static mut COUNTER: Counter = create_counter!("ed25519_verify", 1); let count = batch_size(batches); info!("CPU ECDSA for {}", batch_size(batches)); let rv = batches @@ -91,7 +90,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { .collect() }) .collect(); - inc_counter!(COUNTER, count); + inc_new_counter!("ed25519_verify", count); rv } @@ -109,7 +108,6 @@ pub fn init() { #[cfg(feature = "cuda")] pub fn ed25519_verify(batches: &Vec) -> Vec> { use packet::PACKET_DATA_SIZE; - static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1); let count = batch_size(batches); info!("CUDA ECDSA for {}", batch_size(batches)); let mut out = Vec::new(); @@ -169,7 +167,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { num += 1; } } - inc_counter!(COUNTER, count); + inc_new_counter!("ed25519_verify", count); rvs } diff --git a/src/streamer.rs b/src/streamer.rs index cb6c519285..26f01d973e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; -const LOG_RATE: usize = 10; pub const WINDOW_SIZE: u64 = 2 * 1024; pub type PacketReceiver = Receiver; pub type PacketSender = Sender; @@ -224,9 +223,7 @@ fn repair_window( let reqs = find_next_missing(locked_window, crdt, consumed, received)?; trace!("{:x}: repair_window missing: {}", debug_id, reqs.len()); if !reqs.is_empty() { - static mut COUNTER_REPAIR: Counter = - create_counter!("streamer-repair_window-repair", LOG_RATE); - inc_counter!(COUNTER_REPAIR, reqs.len()); + inc_new_counter!("streamer-repair_window-repair", reqs.len()); debug!( "{:x}: repair_window counter times: {} consumed: {} received: {} missing: {}", debug_id, @@ -301,9 +298,7 @@ fn retransmit_all_leader_blocks( *received, retransmit_queue.len(), ); - static mut COUNTER_RETRANSMIT: Counter = - create_counter!("streamer-recv_window-retransmit", LOG_RATE); - inc_counter!(COUNTER_RETRANSMIT, retransmit_queue.len()); + inc_new_counter!("streamer-recv_window-retransmit", retransmit_queue.len()); retransmit.send(retransmit_queue)?; } Ok(()) @@ -413,8 +408,7 @@ fn recv_window( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } - static mut COUNTER_RECV: Counter = create_counter!("streamer-recv_window-recv", LOG_RATE); - inc_counter!(COUNTER_RECV, dq.len()); + inc_new_counter!("streamer-recv_window-recv", dq.len()); debug!( "{:x}: RECV_WINDOW {} {}: got packets {}", debug_id, @@ -480,9 +474,7 @@ fn recv_window( consume_queue.len(), ); trace!("sending consume_queue.len: {}", consume_queue.len()); - static mut COUNTER_CONSUME: Counter = - create_counter!("streamer-recv_window-consume", LOG_RATE); - inc_counter!(COUNTER_CONSUME, consume_queue.len()); + inc_new_counter!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; } Ok(()) @@ -647,9 +639,7 @@ fn broadcast( // Index the blobs Crdt::index_blobs(&me, &blobs, receive_index)?; // keep the cache of blobs that are broadcast - static mut COUNTER_BROADCAST: Counter = - create_counter!("streamer-broadcast-sent", LOG_RATE); - inc_counter!(COUNTER_BROADCAST, blobs.len()); + inc_new_counter!("streamer-broadcast-sent", blobs.len()); { let mut win = window.write().unwrap(); assert!(blobs.len() <= win.len());