fix streaming bug + no_std attempt with util as the testbench
This commit is contained in:
parent
91be4d0aad
commit
b8bee9b19e
|
@ -32,23 +32,27 @@ harness = false
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
metrics = { version = "0.13.0-alpha.1", path = "../metrics", features = ["std"] }
|
metrics = { version = "0.13.0-alpha.1", path = "../metrics", features = ["std"] }
|
||||||
crossbeam-epoch = "0.9"
|
crossbeam-epoch = { version = "0.9", optional = true }
|
||||||
crossbeam-utils = "0.8"
|
crossbeam-utils = { version = "0.8", default-features = false }
|
||||||
serde = "1.0"
|
arc-swap = { version = "0.4", optional = true }
|
||||||
arc-swap = "0.4"
|
atomic-shim = { version = "0.1", optional = true }
|
||||||
atomic-shim = "0.1"
|
|
||||||
parking_lot = "0.11"
|
|
||||||
aho-corasick = { version = "0.7", optional = true }
|
aho-corasick = { version = "0.7", optional = true }
|
||||||
dashmap = "3"
|
dashmap = { version = "3", optional = true }
|
||||||
indexmap = "1.6"
|
indexmap = { version = "1.6", optional = true }}
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
bolero = "0.5"
|
||||||
criterion = "0.3"
|
criterion = "0.3"
|
||||||
lazy_static = "1.3"
|
lazy_static = "1.3"
|
||||||
rand = { version = "0.7", features = ["small_rng"] }
|
rand = { version = "0.7", features = ["small_rng"] }
|
||||||
rand_distr = "0.3"
|
rand_distr = "0.3"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["std", "layer-filter"]
|
default = ["std"]
|
||||||
std = []
|
std = ["arc-swap", "atomic-shim", "crossbeam-epoch", "dashmap", "indexmap"]
|
||||||
layer-filter = ["aho-corasick"]
|
layer-filter = ["aho-corasick"]
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
name = "streaming"
|
||||||
|
path = "tests/streaming/fuzz_target.rs"
|
||||||
|
harness = false
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::hash::{Hash, Hasher};
|
use core::hash::{Hash, Hasher};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::{handle::Handle, registry::Registry};
|
use crate::{handle::Handle, registry::Registry};
|
||||||
|
|
|
@ -1,24 +1,33 @@
|
||||||
//! Helper types and functions used within the metrics ecosystem.
|
//! Helper types and functions used within the metrics ecosystem.
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
#![cfg_attr(not(feature = "std"), no_std)]
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
mod bucket;
|
mod bucket;
|
||||||
|
#[cfg(feature = "std")]
|
||||||
pub use bucket::AtomicBucket;
|
pub use bucket::AtomicBucket;
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
mod debugging;
|
mod debugging;
|
||||||
|
#[cfg(feature = "std")]
|
||||||
pub use debugging::{DebugValue, DebuggingRecorder, MetricKind, Snapshotter};
|
pub use debugging::{DebugValue, DebuggingRecorder, MetricKind, Snapshotter};
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
mod handle;
|
mod handle;
|
||||||
|
#[cfg(feature = "std")]
|
||||||
pub use handle::Handle;
|
pub use handle::Handle;
|
||||||
|
|
||||||
|
#[cfg(feature = "std")]
|
||||||
mod streaming;
|
mod streaming;
|
||||||
|
#[cfg(feature = "std")]
|
||||||
pub use streaming::StreamingIntegers;
|
pub use streaming::StreamingIntegers;
|
||||||
|
|
||||||
mod quantile;
|
mod quantile;
|
||||||
pub use quantile::{parse_quantiles, Quantile};
|
pub use quantile::{parse_quantiles, Quantile};
|
||||||
|
|
||||||
mod tree;
|
#[cfg(feature = "std")]
|
||||||
pub use tree::{Integer, MetricsTree};
|
|
||||||
|
|
||||||
mod registry;
|
mod registry;
|
||||||
|
#[cfg(feature = "std")]
|
||||||
pub use registry::Registry;
|
pub use registry::Registry;
|
||||||
|
|
||||||
mod key;
|
mod key;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::hash::Hash;
|
use core::hash::Hash;
|
||||||
|
|
||||||
/// A high-performance metric registry.
|
/// A high-performance metric registry.
|
||||||
///
|
///
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::slice;
|
use core::slice;
|
||||||
|
|
||||||
/// A compressed set of integers.
|
/// A compressed set of integers.
|
||||||
///
|
///
|
||||||
|
@ -50,7 +50,7 @@ use std::slice;
|
||||||
pub struct StreamingIntegers {
|
pub struct StreamingIntegers {
|
||||||
inner: Vec<u8>,
|
inner: Vec<u8>,
|
||||||
len: usize,
|
len: usize,
|
||||||
last: Option<i64>,
|
last: Option<i128>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StreamingIntegers {
|
impl StreamingIntegers {
|
||||||
|
@ -99,7 +99,7 @@ impl StreamingIntegers {
|
||||||
// a delta value.
|
// a delta value.
|
||||||
let mut src_idx = 0;
|
let mut src_idx = 0;
|
||||||
if self.last.is_none() {
|
if self.last.is_none() {
|
||||||
let first = src[src_idx] as i64;
|
let first = src[src_idx] as i128;
|
||||||
self.last = Some(first);
|
self.last = Some(first);
|
||||||
|
|
||||||
let zigzag = zigzag_encode(first);
|
let zigzag = zigzag_encode(first);
|
||||||
|
@ -112,7 +112,8 @@ impl StreamingIntegers {
|
||||||
let mut last = self.last.unwrap();
|
let mut last = self.last.unwrap();
|
||||||
|
|
||||||
while src_idx < src_len {
|
while src_idx < src_len {
|
||||||
let value = src[src_idx] as i64;
|
let value = src[src_idx] as i128;
|
||||||
|
// attempted to subtract with overflow
|
||||||
let diff = value - last;
|
let diff = value - last;
|
||||||
let zigzag = zigzag_encode(diff);
|
let zigzag = zigzag_encode(diff);
|
||||||
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
|
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
|
||||||
|
@ -194,17 +195,17 @@ impl StreamingIntegers {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn zigzag_encode(input: i64) -> u64 {
|
fn zigzag_encode(input: i128) -> u128 {
|
||||||
((input << 1) ^ (input >> 63)) as u64
|
((input << 1) ^ (input >> 127)) as u128
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn zigzag_decode(input: u64) -> i64 {
|
fn zigzag_decode(input: u128) -> i128 {
|
||||||
((input >> 1) as i64) ^ (-((input & 1) as i64))
|
((input >> 1) as i128) ^ (-((input & 1) as i128))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize {
|
fn vbyte_encode(mut input: u128, buf: &mut [u8], mut buf_idx: usize) -> usize {
|
||||||
while input >= 128 {
|
while input >= 128 {
|
||||||
buf[buf_idx] = 0x80 as u8 | (input as u8 & 0x7F);
|
buf[buf_idx] = 0x80 as u8 | (input as u8 & 0x7F);
|
||||||
buf_idx += 1;
|
buf_idx += 1;
|
||||||
|
@ -215,11 +216,11 @@ fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u64, usize) {
|
fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u128, usize) {
|
||||||
let mut tmp = 0;
|
let mut tmp = 0;
|
||||||
let mut factor = 0;
|
let mut factor = 0;
|
||||||
loop {
|
loop {
|
||||||
tmp |= u64::from(buf[buf_idx] & 0x7F) << (7 * factor);
|
tmp |= u128::from(buf[buf_idx] & 0x7F) << (7 * factor);
|
||||||
if buf[buf_idx] & 0x80 != 0x80 {
|
if buf[buf_idx] & 0x80 != 0x80 {
|
||||||
return (tmp, buf_idx + 1);
|
return (tmp, buf_idx + 1);
|
||||||
}
|
}
|
||||||
|
@ -240,6 +241,22 @@ mod tests {
|
||||||
assert_eq!(decompressed.len(), 0);
|
assert_eq!(decompressed.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_streaming_integers_edge_cases() {
|
||||||
|
let mut si = StreamingIntegers::new();
|
||||||
|
let decompressed = si.decompress();
|
||||||
|
assert_eq!(decompressed.len(), 0);
|
||||||
|
|
||||||
|
let values = vec![
|
||||||
|
140754668284938,
|
||||||
|
9223372079804448768,
|
||||||
|
];
|
||||||
|
si.compress(&values);
|
||||||
|
|
||||||
|
let decompressed = si.decompress();
|
||||||
|
assert_eq!(decompressed, values);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_streaming_integers_single_block() {
|
fn test_streaming_integers_single_block() {
|
||||||
let mut si = StreamingIntegers::new();
|
let mut si = StreamingIntegers::new();
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
use serde::ser::{Serialize, Serializer};
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
/// An integer metric value.
|
|
||||||
pub enum Integer {
|
|
||||||
/// A signed value.
|
|
||||||
Signed(i64),
|
|
||||||
|
|
||||||
/// An unsigned value.
|
|
||||||
Unsigned(u64),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<i64> for Integer {
|
|
||||||
fn from(i: i64) -> Integer {
|
|
||||||
Integer::Signed(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<u64> for Integer {
|
|
||||||
fn from(i: u64) -> Integer {
|
|
||||||
Integer::Unsigned(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum TreeEntry {
|
|
||||||
Value(Integer),
|
|
||||||
Nested(MetricsTree),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for TreeEntry {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
TreeEntry::Value(value) => match value {
|
|
||||||
Integer::Signed(i) => serializer.serialize_i64(*i),
|
|
||||||
Integer::Unsigned(i) => serializer.serialize_u64(*i),
|
|
||||||
},
|
|
||||||
TreeEntry::Nested(tree) => tree.serialize(serializer),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A tree-structured metrics container.
|
|
||||||
///
|
|
||||||
/// Used for building a tree structure out of scoped metrics, where each level in the tree
|
|
||||||
/// represents a nested scope.
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct MetricsTree {
|
|
||||||
contents: HashMap<String, TreeEntry>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MetricsTree {
|
|
||||||
/// Inserts a single value into the tree.
|
|
||||||
pub fn insert_value<V: Into<Integer>>(
|
|
||||||
&mut self,
|
|
||||||
mut levels: Vec<String>,
|
|
||||||
key: String,
|
|
||||||
value: V,
|
|
||||||
) {
|
|
||||||
match levels.len() {
|
|
||||||
0 => {
|
|
||||||
self.contents.insert(key, TreeEntry::Value(value.into()));
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let name = levels.remove(0);
|
|
||||||
let inner = self
|
|
||||||
.contents
|
|
||||||
.entry(name)
|
|
||||||
.or_insert_with(|| TreeEntry::Nested(MetricsTree::default()));
|
|
||||||
|
|
||||||
if let TreeEntry::Nested(tree) = inner {
|
|
||||||
tree.insert_value(levels, key, value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Inserts multiple values into the tree.
|
|
||||||
pub fn insert_values<V: Into<Integer>>(
|
|
||||||
&mut self,
|
|
||||||
mut levels: Vec<String>,
|
|
||||||
values: Vec<(String, V)>,
|
|
||||||
) {
|
|
||||||
match levels.len() {
|
|
||||||
0 => {
|
|
||||||
for v in values.into_iter() {
|
|
||||||
self.contents.insert(v.0, TreeEntry::Value(v.1.into()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let name = levels.remove(0);
|
|
||||||
let inner = self
|
|
||||||
.contents
|
|
||||||
.entry(name)
|
|
||||||
.or_insert_with(|| TreeEntry::Nested(MetricsTree::default()));
|
|
||||||
|
|
||||||
if let TreeEntry::Nested(tree) = inner {
|
|
||||||
tree.insert_values(levels, values);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Clears all entries in the tree.
|
|
||||||
pub fn clear(&mut self) {
|
|
||||||
self.contents.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for MetricsTree {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
let mut sorted = self.contents.iter().collect::<Vec<_>>();
|
|
||||||
sorted.sort_by_key(|p| p.0);
|
|
||||||
|
|
||||||
serializer.collect_map(sorted)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1 @@
|
||||||
|
糜<EFBFBD>箒筱sssェ>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>s<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ss<EFBFBD>><3E><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>鮊<EFBFBD>N
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD>徉<EFBFBD><EFBFBD><EFBFBD><EFBFBD>瞁
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
Binary file not shown.
|
@ -0,0 +1,2 @@
|
||||||
|
íÿÿÿÿÿÿNíYí«ÍíYí«XXX
|
||||||
|
Á(y
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
ォ粐粐粐粐粐
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD>忖
|
|
@ -0,0 +1 @@
|
||||||
|
瞁<EFBFBD><EFBFBD><EFBFBD>薶滫
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,2 @@
|
||||||
|
<EFBFBD><EFBFBD>N倢戓ヘXX
|
||||||
|
チ(y戓(
|
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
Íí(Y((Y(*
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>s<EFBFBD>s<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><12><><EFBFBD><EFBFBD>N<EFBFBD><4E>
|
|
@ -0,0 +1 @@
|
||||||
|
ííÍNí˙ĺíA
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
|
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>鮊<EFBFBD><EFBFBD>鮊
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>K
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><04>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD>
|
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
澵澵戓ォ
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><0E>
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>(<28><>(
|
|
@ -0,0 +1,4 @@
|
||||||
|
<EFBFBD><EFBFBD>Y#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>&XY<58>XY<58>X<EFBFBD><58><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||||||
|
X<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
||||||
|
X
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>(<28>y
|
|
@ -0,0 +1,2 @@
|
||||||
|
ííYí«ÍíYí«XXX
|
||||||
|
Á(y
|
Binary file not shown.
|
@ -0,0 +1,2 @@
|
||||||
|
<EFBFBD><EFBFBD>Y<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>&XY<58>X<EFBFBD><58><EFBFBD><EFBFBD><EFBFBD><EFBFBD>X
|
||||||
|
(<28>y
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,2 @@
|
||||||
|
<EFBFBD>
|
||||||
|
<EFBFBD><EFBFBD>
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><0E>
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
ォォ
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD>
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1 @@
|
||||||
|
ォ粐z粐粐
|
|
@ -0,0 +1 @@
|
||||||
|
迯<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>鮊<EFBFBD>鮊
|
|
@ -0,0 +1,3 @@
|
||||||
|
ííXX
|
||||||
|
ÁXXX
|
||||||
|
Á(yäää)y
|
|
@ -0,0 +1 @@
|
||||||
|
<EFBFBD>
|
|
@ -0,0 +1,9 @@
|
||||||
|
use bolero::fuzz;
|
||||||
|
use metrics_util::StreamingIntegers;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
fuzz!().with_type().for_each(|value: &Vec<u64>| {
|
||||||
|
let mut si = StreamingIntegers::new();
|
||||||
|
si.compress(&value);
|
||||||
|
});
|
||||||
|
}
|
Loading…
Reference in New Issue