Major additions to metrics-util + formatting/documentation polish.

We've added two new major types to the crate:
- AtomicBucket, which allows queue-style atomic writes with atomic
snapshots, powered by crossbeam-epoch
- StreamingIntegers, a scalar delta/zigzag/variable-byte integer
compression implementation

These types are a major part of reworking metrics to be event loop-less
and may be eventually be spun out into their own standalone crates, and
they have value outside of just metrics.

We've also really leveled up our documentation and benchmarks, and these
two types now have full benchmark suites to better demonstrate their
value and their performance on a given system.
This commit is contained in:
Toby Lawrence 2019-05-27 12:30:28 -04:00
parent 3a0aceab68
commit 29eb316438
7 changed files with 1027 additions and 43 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "metrics-util"
version = "0.1.0"
version = "0.2.0"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
@ -16,4 +16,19 @@ readme = "README.md"
keywords = ["metrics", "quantile", "percentile"]
[[bench]]
name = "bucket"
harness = false
[[bench]]
name = "streaming_integers"
harness = false
[dependencies]
crossbeam-epoch = "^0.7"
[dev-dependencies]
crossbeam = "^0.7"
criterion = "^0.2.9"
lazy_static = "^1.3"
rand = "^0.6"

View File

@ -0,0 +1,60 @@
#[macro_use]
extern crate criterion;
#[macro_use]
extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics_util::AtomicBucket;
lazy_static! {
static ref RANDOM_INTS: Vec<u64> = vec![
21061184, 21301862, 21331592, 21457012, 21500016, 21537837, 21581557, 21620030, 21664102,
21678463, 21708437, 21751808, 21845243, 21850265, 21938879, 21971014, 22005842, 22034601,
22085552, 22101746, 22115429, 22139883, 22260209, 22270768, 22298080, 22299780, 22307659,
22354697, 22355668, 22359397, 22463872, 22496590, 22590978, 22603740, 22706352, 22820895,
22849491, 22891538, 22912955, 22919915, 22928920, 22968656, 22985992, 23033739, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 23061395, 23077554, 23138588, 23185172, 23282479, 23290830, 23316844,
23386911, 23641319, 23677058, 23742930, 25350389, 25399746, 25404925, 25464391, 25478415,
25480015, 25632783, 25639769, 25645612, 25688228, 25724427, 25862192, 25954476, 25994479,
26008752, 26036460, 26038202, 26078874, 26118327, 26132679, 26207601, 26262418, 26270737,
26274860, 26431248, 26434268, 26562736, 26580134, 26593740, 26618561, 26844181, 26866971,
26907883, 27005270, 27023584, 27024044, 27057184, 23061395, 23077554, 23138588, 23185172,
23282479, 23290830, 23316844, 23386911, 23641319, 23677058, 23742930, 25350389, 25399746,
25404925, 25464391, 25478415, 25480015, 25632783, 25639769, 25645612, 25688228, 25724427,
25862192, 25954476, 25994479, 26008752, 26036460, 26038202, 26078874, 26118327, 26132679,
26207601, 26262418, 26270737, 26274860, 26431248, 26434268, 26562736, 26580134, 26593740,
26618561, 26844181, 26866971, 26907883, 27005270, 27023584, 27024044, 27057184, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 27088034, 27088550, 27302898, 27353925, 27412984, 27488633, 27514155,
27558052, 27601937, 27606339, 27624514, 27680396, 27684064, 27963602, 27414982, 28450673
];
}
fn bucket_benchmark(c: &mut Criterion) {
c.bench(
"bucket",
Benchmark::new("write", |b| {
let bucket = AtomicBucket::new();
b.iter(|| {
for value in RANDOM_INTS.iter() {
bucket.push(value);
}
})
})
.throughput(Throughput::Elements(RANDOM_INTS.len() as u32)),
);
}
criterion_group!(benches, bucket_benchmark);
criterion_main!(benches);

View File

@ -0,0 +1,104 @@
#[macro_use]
extern crate criterion;
#[macro_use]
extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics_util::StreamingIntegers;
use rand::{
distributions::{Distribution, Gamma},
rngs::SmallRng,
Rng, SeedableRng,
};
use std::time::Duration;
lazy_static! {
static ref NORMAL_SMALL: Vec<u64> = get_gamma_distribution(100, Duration::from_millis(200));
static ref NORMAL_MEDIUM: Vec<u64> = get_gamma_distribution(10000, Duration::from_millis(200));
static ref NORMAL_LARGE: Vec<u64> = get_gamma_distribution(1000000, Duration::from_millis(200));
static ref LINEAR_SMALL: Vec<u64> = get_linear_distribution(100);
static ref LINEAR_MEDIUM: Vec<u64> = get_linear_distribution(10000);
static ref LINEAR_LARGE: Vec<u64> = get_linear_distribution(1000000);
}
fn get_gamma_distribution(len: usize, upper_bound: Duration) -> Vec<u64> {
// Start with a seeded RNG so that we predictably regenerate our data.
let mut rng = SmallRng::seed_from_u64(len as u64);
// This Gamma distribution gets us pretty close to a typical web server response time
// distribution where there's a big peak down low, and a long tail that drops off sharply.
let gamma = Gamma::new(1.75, 1.0);
// Scale all the values by 22 million to simulate a lower bound of 22ms (but in
// nanoseconds) for all generated values.
gamma
.sample_iter(&mut rng)
.map(|n| n * upper_bound.as_nanos() as f64)
.map(|n| n as u64)
.take(len)
.collect::<Vec<u64>>()
}
fn get_linear_distribution(len: usize) -> Vec<u64> {
let mut values = Vec::new();
for i in 0..len as u64 {
values.push(i);
}
values
}
macro_rules! define_basic_benches {
($c:ident, $name:expr, $input:ident) => {
$c.bench(
$name,
Benchmark::new("compress", |b| {
b.iter_with_large_drop(|| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
si
})
})
.with_function("decompress", |b| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
b.iter_with_large_drop(move || si.decompress())
})
.with_function("decompress + sum", |b| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
b.iter_with_large_drop(move || {
let total: u64 = si.decompress().iter().sum();
total
})
})
.with_function("decompress_with + sum", |b| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
b.iter(move || {
let mut total = 0;
si.decompress_with(|batch| {
let batch_total: u64 = batch.iter().sum();
total += batch_total;
});
});
})
.throughput(Throughput::Elements($input.len() as u32)),
)
};
}
fn streaming_integer_benchmark(c: &mut Criterion) {
define_basic_benches!(c, "normal small", NORMAL_SMALL);
define_basic_benches!(c, "normal medium", NORMAL_MEDIUM);
define_basic_benches!(c, "normal large", NORMAL_LARGE);
define_basic_benches!(c, "linear small", LINEAR_SMALL);
define_basic_benches!(c, "linear medium", LINEAR_MEDIUM);
define_basic_benches!(c, "linear large", LINEAR_LARGE);
}
criterion_group!(benches, streaming_integer_benchmark);
criterion_main!(benches);

444
metrics-util/src/bucket.rs Normal file
View File

@ -0,0 +1,444 @@
use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared};
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{mem, slice};
const BLOCK_SIZE: usize = 128;
/// Discrete chunk of values with atomic read/write access.
struct Block<T> {
// Write index.
write: AtomicUsize,
// Read index.
read: AtomicUsize,
// The individual slots.
slots: [UnsafeCell<T>; BLOCK_SIZE],
// The next block before this one.
prev: Atomic<Block<T>>,
}
impl<T> Block<T> {
/// Creates a new [`Block`].
pub fn new() -> Self {
Block {
write: AtomicUsize::new(0),
read: AtomicUsize::new(0),
slots: unsafe { mem::zeroed() },
prev: Atomic::null(),
}
}
/// Gets the current length of this block.
pub fn len(&self) -> usize {
self.read.load(Ordering::Acquire)
}
/// Gets a slice of the data written to this block.
pub fn data(&self) -> &[T] {
let len = self.len();
let head = self.slots[0].get();
unsafe { slice::from_raw_parts(head as *const T, len) }
}
/// Links this block to the previous block in the bucket.
pub fn set_prev(&self, prev: Shared<Block<T>>, guard: &Guard) {
match self
.prev
.compare_and_set(Shared::null(), prev, Ordering::AcqRel, guard)
{
Ok(_) => {}
Err(_) => unreachable!(),
}
}
/// Pushes a value into this block.
pub fn push(&self, value: T) -> Result<(), T> {
// Try to increment the index. If we've reached the end of the block, let the bucket know
// so it can attach another block.
let index = self.write.fetch_add(1, Ordering::AcqRel);
if index >= BLOCK_SIZE {
return Err(value);
}
// Update the slot.
unsafe {
self.slots[index].get().write(value);
}
// Scoot our read index forward.
self.read.fetch_add(1, Ordering::AcqRel);
Ok(())
}
}
unsafe impl<T> Send for Block<T> {}
unsafe impl<T> Sync for Block<T> {}
impl<T> Drop for Block<T> {
fn drop(&mut self) {
let guard = &epoch_pin();
let prev = self.prev.swap(Shared::null(), Ordering::AcqRel, guard);
if !prev.is_null() {
unsafe {
guard.defer_destroy(prev);
}
guard.flush();
}
}
}
/// An atomic bucket with snapshot capabilities.
///
/// This bucket is implemented as a singly-linked list of blocks, where each block is a small
/// buffer that can hold a handful of elements. There is no limit to how many elements can be in
/// the bucket at a time. Blocks are dynamically allocated as elements are pushed into the bucket.
///
/// Unlike a queue, buckets cannot be drained element by element: callers must iterate the whole
/// structure. Reading the bucket happens in reverse, to allow writers to make forward progress
/// without affecting the iteration of the previously-written values.
///
/// The bucket can be cleared while a concurrent snapshot is taking place, and will not affect the
/// reader.
#[derive(Debug)]
pub struct AtomicBucket<T> {
tail: Atomic<Block<T>>,
}
impl<T> AtomicBucket<T> {
/// Creates a new, empty bucket.
pub fn new() -> Self {
AtomicBucket {
tail: Atomic::null(),
}
}
/// Pushes an element into the bucket.
pub fn push(&self, value: T) {
let mut original = value;
loop {
// Load the tail block, or install a new one.
let guard = &epoch_pin();
let mut tail = self.tail.load(Ordering::Acquire, guard);
if tail.is_null() {
// No blocks at all yet. We need to create one.
match self.tail.compare_and_set(
Shared::null(),
Owned::new(Block::new()),
Ordering::AcqRel,
guard,
) {
// We won the race to install the new block.
Ok(ptr) => tail = ptr,
// Somebody else beat us, so just update our pointer.
Err(e) => tail = e.current,
}
}
// We have a block now, so we need to try writing to it.
let tail_block = unsafe { tail.deref() };
match tail_block.push(original) {
// If the push was OK, then the block wasn't full. It might _now_ be full, but we'll
// let future callers deal with installing a new block if necessary.
Ok(_) => return,
// The block was full, so we've been given the value back and we need to install a new block.
Err(value) => {
match self.tail.compare_and_set(
tail,
Owned::new(Block::new()),
Ordering::AcqRel,
guard,
) {
// We managed to install the block, so we need to link this new block to
// the previous block.
Ok(ptr) => {
let new_tail = unsafe { ptr.deref() };
new_tail.set_prev(tail, guard);
// Now push into our new block.
match new_tail.push(value) {
// We wrote the value successfully, so we're good here!
Ok(_) => return,
// The block was full, so just loop and start over.
Err(value) => {
original = value;
continue;
}
}
}
// Somebody else installed the block before us, so let's just start over.
Err(_) => {
original = value;
continue;
}
}
}
}
}
}
/// Collects all of the elements written to the bucket.
///
/// This operation can be slow as it involves allocating enough space to hold all of the
/// elements within the bucket. Consider [`data_with`](AtomicBucket::data_with) to incrementally iterate
/// the internal blocks within the bucket.
///
/// Elements are in partial reverse order: blocks are iterated in reverse order, but the
/// elements within them will appear in their original order.
pub fn data(&self) -> Vec<T>
where
T: Clone,
{
let mut values = Vec::new();
self.data_with(|block| values.extend_from_slice(block));
values
}
/// Iterates all of the elements written to the bucket, invoking `f` for each block.
///
/// Elements are in partial reverse order: blocks are iterated in reverse order, but the
/// elements within them will appear in their original order.
pub fn data_with<F>(&self, mut f: F)
where
F: FnMut(&[T]),
{
let guard = &epoch_pin();
// While we have a valid block -- either `tail` or the next block as we keep reading -- we
// load the data from each block and process it by calling `f`.
let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
while !block_ptr.is_null() {
let block = unsafe { block_ptr.deref() };
// Read the data out of the block.
let data = block.data();
f(data);
// Load the next block.
block_ptr = block.prev.load(Ordering::Acquire, guard);
}
}
/// Clears the bucket.
///
/// Deallocation of the internal blocks happens only when all readers have finished, and so
/// will not necessarily occur during or immediately preceding this method.
///
/// # Note
/// This method will not affect reads that are already in progress.
pub fn clear(&self) {
// We simply swap the tail pointer which effectively clears the bucket. Callers might
// still be in process of writing to the tail node, or reading the data, but new callers
// will see it as empty until another write proceeds.
let guard = &epoch_pin();
let tail = self.tail.load(Ordering::Acquire, guard);
if !tail.is_null() {
if self
.tail
.compare_and_set(tail, Shared::null(), Ordering::SeqCst, guard)
.is_ok()
{
// We won the swap to delete the tail node. Now configure a deferred drop to clean
// things up once nobody else is using it.
unsafe {
// Drop the block, which will cause a cascading drop on the next block, and
// so on and so forth, until all blocks linked to this one are dropped.
guard.defer_destroy(tail);
}
guard.flush();
}
}
}
}
#[cfg(test)]
mod tests {
use super::{AtomicBucket, Block, BLOCK_SIZE};
#[test]
fn test_create_new_block() {
let block: Block<u64> = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
}
#[test]
fn test_block_write_then_read() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let result = block.push(42);
assert!(result.is_ok());
let data = block.data();
assert_eq!(data.len(), 1);
assert_eq!(data[0], 42);
}
#[test]
fn test_block_write_until_full_then_read() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
let data = block.data();
assert_eq!(data.len(), BLOCK_SIZE);
let sum: u64 = data.iter().sum();
assert_eq!(sum, total);
let result = block.push(42);
assert!(result.is_err());
}
#[test]
fn test_block_write_until_full_then_read_mt() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let res = crossbeam::scope(|s| {
let t1 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 / 2 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 / 2 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
total
});
let t1_total = t1.join().unwrap();
let t2_total = t2.join().unwrap();
t1_total + t2_total
});
let total = res.unwrap();
let data = block.data();
assert_eq!(data.len(), BLOCK_SIZE);
let sum: u64 = data.iter().sum();
assert_eq!(sum, total);
let result = block.push(42);
assert!(result.is_err());
}
#[test]
fn test_bucket_write_then_read() {
let bucket = AtomicBucket::new();
bucket.push(42);
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0], 42);
}
#[test]
fn test_bucket_multiple_blocks_write_then_read() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let target = (BLOCK_SIZE * 3 + BLOCK_SIZE / 2) as u64;
let mut i = 0;
let mut total = 0;
while i < target {
bucket.push(i);
total += i;
i += 1;
}
let snapshot = bucket.data();
assert_eq!(snapshot.len(), target as usize);
let sum: u64 = snapshot.iter().sum();
assert_eq!(sum, total);
}
#[test]
fn test_bucket_write_then_read_mt() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let res = crossbeam::scope(|s| {
let t1 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 * 10_000 {
bucket.push(i);
total += i;
i += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 * 10_000 {
bucket.push(i);
total += i;
i += 1;
}
total
});
let t1_total = t1.join().unwrap();
let t2_total = t2.join().unwrap();
t1_total + t2_total
});
let total = res.unwrap();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), BLOCK_SIZE * 20_000);
let sum: u64 = snapshot.iter().sum();
assert_eq!(sum, total);
}
}

View File

@ -1,45 +1,9 @@
//! Helper types and functions used within the metrics ecosystem.
mod bucket;
pub use bucket::AtomicBucket;
/// A quantile that has both the raw value and a human-friendly display label.
#[derive(Clone)]
pub struct Quantile(f64, String);
mod streaming;
pub use streaming::StreamingIntegers;
impl Quantile {
/// Creates a new `Quantile` from a floating-point value.
///
/// All values clamped between 0.0 and 1.0.
pub fn new(quantile: f64) -> Quantile {
let clamped = quantile.max(0.0);
let clamped = clamped.min(1.0);
let display = clamped * 100.0;
let raw_label = format!("{}", clamped);
let label = match raw_label.as_str() {
"0" => "min".to_string(),
"1" => "max".to_string(),
_ => {
let raw = format!("p{}", display);
raw.replace(".", "")
},
};
Quantile(clamped, label)
}
/// Gets the human-friendly display label for this quantile.
pub fn label(&self) -> &str {
self.1.as_str()
}
/// Gets the raw value for this quantile.
pub fn value(&self) -> f64 {
self.0
}
}
/// Parses a list of floating-point values into a list of `Quantile`s.
pub fn parse_quantiles(quantiles: &[f64]) -> Vec<Quantile> {
quantiles.iter()
.map(|f| Quantile::new(*f))
.collect()
}
mod quantile;
pub use quantile::{parse_quantiles, Quantile};

View File

@ -0,0 +1,102 @@
/// A quantile that has both the raw value and a human-friendly display label.
///
/// We work with quantiles for optimal floating-point precison over percentiles, but most of the
/// time, monitoring systems show us percentiles, and usually in an abbreviated form: `p99`.
///
/// On top of holding the quantile value, we calculate the familiar "p99" style of label, doing the
/// appropriate percentile conversion. Thus, if you have a quantile of `0.99`, the resulting label
/// is `p99`, and if you have a quantile of `0.999`, the resulting label is `p999`.
///
/// There are two special cases, where we label `0.0` and `1.0` as `min` and `max`, respectively.
#[derive(Debug, Clone, PartialEq)]
pub struct Quantile(f64, String);
impl Quantile {
/// Creates a new [`Quantile`] from a floating-point value.
///
/// All values are clamped between 0.0 and 1.0.
pub fn new(quantile: f64) -> Quantile {
let clamped = quantile.max(0.0);
let clamped = clamped.min(1.0);
let display = clamped * 100.0;
let raw_label = format!("{}", clamped);
let label = match raw_label.as_str() {
"0" => "min".to_string(),
"1" => "max".to_string(),
_ => {
let raw = format!("p{}", display);
raw.replace(".", "")
}
};
Quantile(clamped, label)
}
/// Gets the human-friendly display label.
pub fn label(&self) -> &str {
self.1.as_str()
}
/// Gets the raw quantile value.
pub fn value(&self) -> f64 {
self.0
}
}
/// Parses a slice of floating-point values into a vector of [`Quantile`]s.
pub fn parse_quantiles(quantiles: &[f64]) -> Vec<Quantile> {
quantiles.iter().map(|f| Quantile::new(*f)).collect()
}
#[cfg(test)]
mod tests {
use super::{parse_quantiles, Quantile};
#[test]
fn test_quantiles() {
let min = Quantile::new(0.0);
assert_eq!(min.value(), 0.0);
assert_eq!(min.label(), "min");
let max = Quantile::new(1.0);
assert_eq!(max.value(), 1.0);
assert_eq!(max.label(), "max");
let p99 = Quantile::new(0.99);
assert_eq!(p99.value(), 0.99);
assert_eq!(p99.label(), "p99");
let p999 = Quantile::new(0.999);
assert_eq!(p999.value(), 0.999);
assert_eq!(p999.label(), "p999");
let p9999 = Quantile::new(0.9999);
assert_eq!(p9999.value(), 0.9999);
assert_eq!(p9999.label(), "p9999");
let under = Quantile::new(-1.0);
assert_eq!(under.value(), 0.0);
assert_eq!(under.label(), "min");
let over = Quantile::new(1.2);
assert_eq!(over.value(), 1.0);
assert_eq!(over.label(), "max");
}
#[test]
fn test_parse_quantiles() {
let empty = vec![];
let result = parse_quantiles(&empty);
assert_eq!(result.len(), 0);
let normal = vec![0.0, 0.5, 0.99, 0.999, 1.0];
let result = parse_quantiles(&normal);
assert_eq!(result.len(), 5);
assert_eq!(result[0], Quantile::new(0.0));
assert_eq!(result[1], Quantile::new(0.5));
assert_eq!(result[2], Quantile::new(0.99));
assert_eq!(result[3], Quantile::new(0.999));
assert_eq!(result[4], Quantile::new(1.0));
}
}

View File

@ -0,0 +1,295 @@
use std::slice;
/// A compressed set of integers.
///
/// For some workloads, working with a large set of integers can require an outsized amount of
/// memory for numbers that are very similar. This data structure takes chunks of integers and
/// compresses then by using delta encoding and variable-byte encoding.
///
/// Delta encoding tracks the difference between successive integers: if you have 1000000 and
/// 1000001, the difference between the two is only 1. Coupled with variable-byte encoding, we can
/// compress those two numbers within 4 bytes, where normally they would require a minimum of 8
/// bytes if they were 32-bit integers, or 16 bytes if they were 64-bit integers. Over large runs
/// of integers where the delta is relatively small compared to the original value, the compression
/// savings add up quickly.
///
/// The original integers can be decompressed and collected, or can be decompressed on-the-fly
/// while passing them to a given function, allowing callers to observe the integers without
/// allocating the entire size of the decompressed set.
///
/// # Performance
/// As this is a scalar implementation, performance depends heavily on not only the input size, but
/// also the delta between values, as well as whether or not the decompressed values are being
/// collected or used on-the-fly.
///
/// Bigger deltas between values means longer variable-byte sizes which is hard for the CPU to
/// predict. As the linear benchemarks show, things are much faster when the delta between values
/// is minimal.
///
/// These figures were generated on a 2015 Macbook Pro (Core i7, 2.2GHz base/3.7GHz turbo).
///
/// | | compress (1) | decompress (2) | decompress/sum (3) | decompress_with/sum (4) |
/// |------------------------|--------------|----------------|--------------------|-------------------------|
/// | normal, 100 values | 94 Melem/s | 76 Melem/s | 71 Melem/s | 126 Melem/s |
/// | normal, 10000 values | 92 Melem/s | 85 Melem/s | 109 Melem/s | 109 Melem/s |
/// | normal, 1000000 values | 86 Melem/s | 79 Melem/s | 68 Melem/s | 110 Melem/s |
/// | linear, 100 values | 334 Melem/s | 109 Melem/s | 110 Melem/s | 297 Melem/s |
/// | linear, 10000 values | 654 Melem/s | 174 Melem/s | 374 Melem/s | 390 Melem/s |
/// | linear, 1000000 values | 703 Melem/s | 180 Melem/s | 132 Melem/s | 392 Melem/s |
///
/// The normal values consistent of an approximation of real nanosecond-based timing measurements
/// of a web service. The linear values are simply sequential integers ranging from 0 to the
/// configured size of the test run.
///
/// Operations:
/// 1. simply compress the input set, no decompression
/// 2. decompress the entire compressed set into a single vector
/// 3. same as #2 but sum all of the original values at the end
/// 4. use `decompress_with` to sum the numbers incrementally
#[derive(Debug, Default, Clone)]
pub struct StreamingIntegers {
inner: Vec<u8>,
len: usize,
last: Option<i64>,
}
impl StreamingIntegers {
/// Creates a new, empty streaming set.
pub fn new() -> Self {
Default::default()
}
/// Returns the number of elements in the set, also referred to as its 'length'.
pub fn len(&self) -> usize {
self.len
}
/// Returns `true` if the set contains no elements.
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Compresses a slice of integers, and adds them to the set.
pub fn compress(&mut self, src: &[u64]) {
let src_len = src.len();
if src_len == 0 {
return;
}
self.len += src_len;
// Technically, 64-bit integers can take up to 10 bytes when encoded as variable integers
// if they're at the maximum size, so we need to properly allocate here. As we directly
// operate on a mutable slice of the inner buffer below, we _can't_ afford to lazily
// allocate or guess at the resulting compression, otherwise we'll get a panic at runtime
// for bounds checks.
//
// TODO: we should try and add some heuristic here, because we're potentially
// overallocating by a lot when we plan for the worst case scenario
self.inner.reserve(src_len * 10);
let mut buf_idx = self.inner.len();
let buf_cap = self.inner.capacity();
let mut buf = unsafe {
let buf_ptr = self.inner.as_mut_ptr();
slice::from_raw_parts_mut(buf_ptr, buf_cap)
};
// If we have no last value, then the very first integer we write is the full value and not
// a delta value.
let mut src_idx = 0;
if self.last.is_none() {
let first = src[src_idx] as i64;
self.last = Some(first);
let zigzag = zigzag_encode(first);
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
src_idx += 1;
}
// Set up for our actual compression run.
let mut last = self.last.unwrap();
while src_idx < src_len {
let value = src[src_idx] as i64;
let diff = value - last;
let zigzag = zigzag_encode(diff);
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
last = value;
src_idx += 1;
}
unsafe {
self.inner.set_len(buf_idx);
}
self.last = Some(last);
}
/// Decompresses all of the integers written to the set.
///
/// Returns a vector with all of the original values. For larger sets of integers, this can be
/// slow due to the allocation required. Consider [decompress_with] to incrementally iterate
/// the decompresset set in smaller chunks.
///
/// [decompress_with]: StreamingIntegers::decompress_with
pub fn decompress(&self) -> Vec<u64> {
let mut values = Vec::new();
let mut buf_idx = 0;
let buf_len = self.inner.len();
let buf = self.inner.as_slice();
let mut last = 0;
while buf_idx < buf_len {
let (value, new_idx) = vbyte_decode(&buf, buf_idx);
buf_idx = new_idx;
let delta = zigzag_decode(value);
let original = last + delta;
last = original;
values.push(original as u64);
}
values
}
/// Decompresses all of the integers written to the set, invoking `f` for each batch.
///
/// During decompression, values are batched internally until a limit is reached, and then `f`
/// is called with a reference to the batch. This leads to minimal allocation to decompress
/// the entire set, for use cases where the values can be observed incrementally without issue.
pub fn decompress_with<F>(&self, mut f: F)
where
F: FnMut(&[u64]),
{
let mut values = Vec::with_capacity(1024);
let mut buf_idx = 0;
let buf_len = self.inner.len();
let buf = self.inner.as_slice();
let mut last = 0;
while buf_idx < buf_len {
let (value, new_idx) = vbyte_decode(&buf, buf_idx);
buf_idx = new_idx;
let delta = zigzag_decode(value);
let original = last + delta;
last = original;
values.push(original as u64);
if values.len() == values.capacity() {
f(&values);
values.clear();
}
}
if !values.is_empty() {
f(&values);
}
}
}
#[inline]
fn zigzag_encode(input: i64) -> u64 {
((input << 1) ^ (input >> 63)) as u64
}
#[inline]
fn zigzag_decode(input: u64) -> i64 {
((input >> 1) as i64) ^ (-((input & 1) as i64))
}
#[inline]
fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize {
while input >= 128 {
buf[buf_idx] = 0x80 as u8 | (input as u8 & 0x7F);
buf_idx += 1;
input >>= 7;
}
buf[buf_idx] = input as u8;
buf_idx + 1
}
#[inline]
fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u64, usize) {
let mut tmp = 0;
let mut factor = 0;
loop {
tmp |= u64::from(buf[buf_idx] & 0x7F) << (7 * factor);
if buf[buf_idx] & 0x80 != 0x80 {
return (tmp, buf_idx + 1);
}
buf_idx += 1;
factor += 1;
}
}
#[cfg(test)]
mod tests {
use super::StreamingIntegers;
#[test]
fn test_streaming_integers_new() {
let si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
}
#[test]
fn test_streaming_integers_single_block() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![8, 6, 7, 5, 3, 0, 9];
si.compress(&values);
let decompressed = si.decompress();
assert_eq!(decompressed, values);
}
#[test]
fn test_streaming_integers_multiple_blocks() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![8, 6, 7, 5, 3, 0, 9];
si.compress(&values);
let values2 = vec![6, 6, 6];
si.compress(&values2);
let values3 = vec![];
si.compress(&values3);
let values4 = vec![6, 6, 6, 7, 7, 7, 8, 8, 8];
si.compress(&values4);
let total = vec![values, values2, values3, values4]
.into_iter()
.flatten()
.collect::<Vec<_>>();
let decompressed = si.decompress();
assert_eq!(decompressed, total);
}
#[test]
fn test_streaming_integers_empty_block() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![];
si.compress(&values);
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
}
}