metrics/metrics-util/src/bucket.rs

525 lines
16 KiB
Rust

use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared};
use std::{
cell::UnsafeCell,
mem, slice,
sync::atomic::{AtomicUsize, Ordering},
};
#[cfg(target_pointer_width = "16")]
const BLOCK_SIZE: usize = 16;
#[cfg(target_pointer_width = "32")]
const BLOCK_SIZE: usize = 32;
#[cfg(target_pointer_width = "64")]
const BLOCK_SIZE: usize = 64;
/// Discrete chunk of values with atomic read/write access.
struct Block<T> {
// Write index.
write: AtomicUsize,
// Read bitmap.
read: AtomicUsize,
// The individual slots.
slots: [UnsafeCell<T>; BLOCK_SIZE],
// The next block before this one.
prev: Atomic<Block<T>>,
}
impl<T> Block<T> {
/// Creates a new [`Block`].
pub fn new() -> Self {
Block {
write: AtomicUsize::new(0),
read: AtomicUsize::new(0),
slots: unsafe { mem::zeroed() },
prev: Atomic::null(),
}
}
/// Gets the current length of this block.
pub fn len(&self) -> usize {
self.read.load(Ordering::Acquire).trailing_ones() as usize
}
/// Gets a slice of the data written to this block.
pub fn data(&self) -> &[T] {
let len = self.len();
let head = self.slots[0].get();
unsafe { slice::from_raw_parts(head as *const T, len) }
}
/// Links this block to the previous block in the bucket.
pub fn set_prev(&self, prev: Shared<Block<T>>, guard: &Guard) {
match self
.prev
.compare_and_set(Shared::null(), prev, Ordering::AcqRel, guard)
{
Ok(_) => {}
Err(_) => unreachable!(),
}
}
/// Pushes a value into this block.
pub fn push(&self, value: T) -> Result<(), T> {
// Try to increment the index. If we've reached the end of the block, let the bucket know
// so it can attach another block.
let index = self.write.fetch_add(1, Ordering::AcqRel);
if index >= BLOCK_SIZE {
return Err(value);
}
// Update the slot.
unsafe {
self.slots[index].get().write(value);
}
// Scoot our read index forward.
self.read.fetch_or(1 << index, Ordering::AcqRel);
Ok(())
}
}
unsafe impl<T> Send for Block<T> {}
unsafe impl<T> Sync for Block<T> {}
impl<T> Drop for Block<T> {
fn drop(&mut self) {
let guard = &epoch_pin();
let prev = self.prev.swap(Shared::null(), Ordering::AcqRel, guard);
if !prev.is_null() {
unsafe {
guard.defer_destroy(prev);
}
guard.flush();
}
}
}
/// An atomic bucket with snapshot capabilities.
///
/// This bucket is implemented as a singly-linked list of blocks, where each block is a small
/// buffer that can hold a handful of elements. There is no limit to how many elements can be in
/// the bucket at a time. Blocks are dynamically allocated as elements are pushed into the bucket.
///
/// Unlike a queue, buckets cannot be drained element by element: callers must iterate the whole
/// structure. Reading the bucket happens in reverse, to allow writers to make forward progress
/// without affecting the iteration of the previously-written values.
///
/// The bucket can be cleared while a concurrent snapshot is taking place, and will not affect the
/// reader.
#[derive(Debug)]
pub struct AtomicBucket<T> {
tail: Atomic<Block<T>>,
}
impl<T> AtomicBucket<T> {
/// Creates a new, empty bucket.
pub fn new() -> Self {
AtomicBucket {
tail: Atomic::null(),
}
}
/// Pushes an element into the bucket.
pub fn push(&self, value: T) {
let mut original = value;
loop {
// Load the tail block, or install a new one.
let guard = &epoch_pin();
let mut tail = self.tail.load(Ordering::Acquire, guard);
if tail.is_null() {
// No blocks at all yet. We need to create one.
match self.tail.compare_and_set(
Shared::null(),
Owned::new(Block::new()),
Ordering::AcqRel,
guard,
) {
// We won the race to install the new block.
Ok(ptr) => tail = ptr,
// Somebody else beat us, so just update our pointer.
Err(e) => tail = e.current,
}
}
// We have a block now, so we need to try writing to it.
let tail_block = unsafe { tail.deref() };
match tail_block.push(original) {
// If the push was OK, then the block wasn't full. It might _now_ be full, but we'll
// let future callers deal with installing a new block if necessary.
Ok(_) => return,
// The block was full, so we've been given the value back and we need to install a new block.
Err(value) => {
match self.tail.compare_and_set(
tail,
Owned::new(Block::new()),
Ordering::AcqRel,
guard,
) {
// We managed to install the block, so we need to link this new block to
// the previous block.
Ok(ptr) => {
let new_tail = unsafe { ptr.deref() };
new_tail.set_prev(tail, guard);
// Now push into our new block.
match new_tail.push(value) {
// We wrote the value successfully, so we're good here!
Ok(_) => return,
// The block was full, so just loop and start over.
Err(value) => {
original = value;
continue;
}
}
}
// Somebody else installed the block before us, so let's just start over.
Err(_) => {
original = value;
continue;
}
}
}
}
}
}
/// Collects all of the elements written to the bucket.
///
/// This operation can be slow as it involves allocating enough space to hold all of the
/// elements within the bucket. Consider [`data_with`](AtomicBucket::data_with) to incrementally iterate
/// the internal blocks within the bucket.
///
/// Elements are in partial reverse order: blocks are iterated in reverse order, but the
/// elements within them will appear in their original order.
pub fn data(&self) -> Vec<T>
where
T: Clone,
{
let mut values = Vec::new();
self.data_with(|block| values.extend_from_slice(block));
values
}
/// Iterates all of the elements written to the bucket, invoking `f` for each block.
///
/// Elements are in partial reverse order: blocks are iterated in reverse order, but the
/// elements within them will appear in their original order.
pub fn data_with<F>(&self, mut f: F)
where
F: FnMut(&[T]),
{
let guard = &epoch_pin();
// While we have a valid block -- either `tail` or the next block as we keep reading -- we
// load the data from each block and process it by calling `f`.
let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
while !block_ptr.is_null() {
let block = unsafe { block_ptr.deref() };
// Read the data out of the block.
let data = block.data();
f(data);
// Load the next block.
block_ptr = block.prev.load(Ordering::Acquire, guard);
}
}
/// Clears the bucket.
///
/// Deallocation of the internal blocks happens only when all readers have finished, and so
/// will not necessarily occur during or immediately preceding this method.
///
/// # Note
/// This method will not affect reads that are already in progress.
pub fn clear(&self) {
self.clear_with(|_| {})
}
/// Clears the bucket, invoking `f` for every block that will be cleared.
///
/// Deallocation of the internal blocks happens only when all readers have finished, and so
/// will not necessarily occur during or immediately preceding this method.
///
/// This method is useful for accumulating values and then observing them, in a way that allows
/// the caller to avoid visiting the same values again the next time
///
/// This method allows a pattern of observing values before they're cleared, with a clear
/// demarcation. A similar pattern used in the wild would be to have some data structure, like
/// a vector, which is continuously filled, and then eventually swapped out with a new, empty
/// vector, allowing the caller to read all of the old values while new values are being
/// written, over and over again.
///
/// # Note
/// This method will not affect reads that are already in progress.
pub fn clear_with<F>(&self, mut f: F)
where
F: FnMut(&[T]),
{
// We simply swap the tail pointer which effectively clears the bucket. Callers might
// still be in process of writing to the tail node, or reading the data, but new callers
// will see it as empty until another write proceeds.
let guard = &epoch_pin();
let tail = self.tail.load(Ordering::Acquire, guard);
if !tail.is_null()
&& self
.tail
.compare_and_set(tail, Shared::null(), Ordering::SeqCst, guard)
.is_ok()
{
// While we have a valid block -- either `tail` or the next block as we keep reading -- we
// load the data from each block and process it by calling `f`.
let mut block_ptr = tail;
while !block_ptr.is_null() {
let block = unsafe { block_ptr.deref() };
// Read the data out of the block.
let data = block.data();
f(data);
// Load the next block.
block_ptr = block.prev.load(Ordering::Acquire, guard);
}
// Now that we're done read the blocks, trigger a destroy on the tail.
//
// This will cascade backwards through the internal `prev` pointed, destroying all
// blocks associated with this tail.
unsafe {
guard.defer_destroy(tail);
}
guard.flush();
}
}
}
impl<T> Default for AtomicBucket<T> {
fn default() -> Self {
Self {
tail: Atomic::null(),
}
}
}
#[cfg(test)]
mod tests {
use super::{AtomicBucket, Block, BLOCK_SIZE};
use crossbeam_utils::thread::scope;
#[test]
fn test_create_new_block() {
let block: Block<u64> = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
}
#[test]
fn test_block_write_then_read() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let result = block.push(42);
assert!(result.is_ok());
assert_eq!(block.len(), 1);
let data = block.data();
assert_eq!(data.len(), 1);
assert_eq!(data[0], 42);
}
#[test]
fn test_block_write_until_full_then_read() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
let data = block.data();
assert_eq!(data.len(), BLOCK_SIZE);
let sum: u64 = data.iter().sum();
assert_eq!(sum, total);
let result = block.push(42);
assert!(result.is_err());
}
#[test]
fn test_block_write_until_full_then_read_mt() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let res = scope(|s| {
let t1 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 / 2 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 / 2 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
total
});
let t1_total = t1.join().unwrap();
let t2_total = t2.join().unwrap();
t1_total + t2_total
});
let total = res.unwrap();
let data = block.data();
assert_eq!(data.len(), BLOCK_SIZE);
let sum: u64 = data.iter().sum();
assert_eq!(sum, total);
let result = block.push(42);
assert!(result.is_err());
}
#[test]
fn test_bucket_write_then_read() {
let bucket = AtomicBucket::new();
bucket.push(42);
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0], 42);
}
#[test]
fn test_bucket_multiple_blocks_write_then_read() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let target = (BLOCK_SIZE * 3 + BLOCK_SIZE / 2) as u64;
let mut i = 0;
let mut total = 0;
while i < target {
bucket.push(i);
total += i;
i += 1;
}
let snapshot = bucket.data();
assert_eq!(snapshot.len(), target as usize);
let sum: u64 = snapshot.iter().sum();
assert_eq!(sum, total);
}
#[test]
fn test_bucket_write_then_read_mt() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let res = scope(|s| {
let t1 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 * 100_000 {
bucket.push(i);
total += i;
i += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 * 100_000 {
bucket.push(i);
total += i;
i += 1;
}
total
});
let t1_total = t1.join().unwrap();
let t2_total = t2.join().unwrap();
t1_total + t2_total
});
let total = res.unwrap();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), BLOCK_SIZE * 200_000);
let sum = snapshot.iter().sum::<u64>();
assert_eq!(sum, total);
}
#[test]
fn test_clear_and_clear_with() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let mut i = 0;
let mut total_pushed = 0;
while i < BLOCK_SIZE * 4 {
bucket.push(i);
total_pushed += i;
i += 1;
}
let snapshot = bucket.data();
assert_eq!(snapshot.len(), i);
let mut total_accumulated = 0;
bucket.clear_with(|xs| total_accumulated += xs.iter().sum::<usize>());
assert_eq!(total_pushed, total_accumulated);
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
}
}