shardtree: Add the ability to avoid pruning specific checkpoints.
This commit is contained in:
parent
4d797cce13
commit
50429843ce
|
@ -7,6 +7,13 @@ and this project adheres to Rust's notion of
|
|||
|
||||
## Unreleased
|
||||
|
||||
## Added
|
||||
* `shardtree::store::ShardStore::{ensure_retained, ensured_retained_count, should_retain}`
|
||||
|
||||
## Changed
|
||||
* `shardtree::store::ShardStore::with_checkpoints` no longer takes its `self`
|
||||
reference argument as mutable.
|
||||
|
||||
## [0.2.0] - 2023-11-07
|
||||
|
||||
## Added
|
||||
|
|
|
@ -485,12 +485,12 @@ mod tests {
|
|||
ShardTree<MemoryShardStore<String, usize>, 6, 3>,
|
||||
ShardTree<MemoryShardStore<String, usize>, 6, 3>,
|
||||
) {
|
||||
let max_checkpoints = 10;
|
||||
let min_checkpoints_to_retain = 10;
|
||||
let start = Position::from(0);
|
||||
let end = start + leaves.len() as u64;
|
||||
|
||||
// Construct a tree using `ShardTree::insert_tree`.
|
||||
let mut left = ShardTree::new(MemoryShardStore::empty(), max_checkpoints);
|
||||
let mut left = ShardTree::new(MemoryShardStore::empty(), min_checkpoints_to_retain);
|
||||
if let Some(BatchInsertionResult {
|
||||
subtree,
|
||||
checkpoints,
|
||||
|
@ -503,7 +503,7 @@ mod tests {
|
|||
}
|
||||
|
||||
// Construct a tree using `ShardTree::batch_insert`.
|
||||
let mut right = ShardTree::new(MemoryShardStore::empty(), max_checkpoints);
|
||||
let mut right = ShardTree::new(MemoryShardStore::empty(), min_checkpoints_to_retain);
|
||||
right.batch_insert(start, leaves.into_iter()).unwrap();
|
||||
|
||||
(left, right)
|
||||
|
|
|
@ -66,8 +66,8 @@ mod legacy;
|
|||
pub struct ShardTree<S: ShardStore, const DEPTH: u8, const SHARD_HEIGHT: u8> {
|
||||
/// The vector of tree shards.
|
||||
store: S,
|
||||
/// The maximum number of checkpoints to retain before pruning.
|
||||
max_checkpoints: usize,
|
||||
/// The minumum number of checkpoints to retain when pruning.
|
||||
min_checkpoints_to_retain: usize,
|
||||
}
|
||||
|
||||
impl<
|
||||
|
@ -79,10 +79,10 @@ impl<
|
|||
> ShardTree<S, DEPTH, SHARD_HEIGHT>
|
||||
{
|
||||
/// Creates a new empty tree.
|
||||
pub fn new(store: S, max_checkpoints: usize) -> Self {
|
||||
pub fn new(store: S, min_checkpoints_to_retain: usize) -> Self {
|
||||
Self {
|
||||
store,
|
||||
max_checkpoints,
|
||||
min_checkpoints_to_retain,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -438,14 +438,19 @@ impl<
|
|||
.checkpoint_count()
|
||||
.map_err(ShardTreeError::Storage)?;
|
||||
trace!(
|
||||
"Tree has {} checkpoints, max is {}",
|
||||
"Tree has {} checkpoints, min to be retained is {}",
|
||||
checkpoint_count,
|
||||
self.max_checkpoints,
|
||||
self.min_checkpoints_to_retain,
|
||||
);
|
||||
if checkpoint_count > self.max_checkpoints {
|
||||
let retain_count = self.min_checkpoints_to_retain
|
||||
+ self
|
||||
.store
|
||||
.ensured_retained_count()
|
||||
.map_err(ShardTreeError::Storage)?;
|
||||
if checkpoint_count > retain_count {
|
||||
// Batch removals by subtree & create a list of the checkpoint identifiers that
|
||||
// will be removed from the checkpoints map.
|
||||
let remove_count = checkpoint_count - self.max_checkpoints;
|
||||
let remove_count = checkpoint_count - retain_count;
|
||||
let mut checkpoints_to_delete = vec![];
|
||||
let mut clear_positions: BTreeMap<Address, BTreeMap<Position, RetentionFlags>> =
|
||||
BTreeMap::new();
|
||||
|
@ -454,8 +459,10 @@ impl<
|
|||
// When removing is true, we are iterating through the range of
|
||||
// checkpoints being removed. When remove is false, we are
|
||||
// iterating through the range of checkpoints that are being
|
||||
// retained.
|
||||
let removing = checkpoints_to_delete.len() < remove_count;
|
||||
// retained, or skipping over a particular checkpoint that we
|
||||
// have been explicitly asked to retain.
|
||||
let removing = checkpoints_to_delete.len() < remove_count
|
||||
&& !self.store.should_retain(cid)?;
|
||||
|
||||
if removing {
|
||||
checkpoints_to_delete.push(cid.clone());
|
||||
|
@ -1177,9 +1184,9 @@ impl<
|
|||
/// Make a marked leaf at a position eligible to be pruned.
|
||||
///
|
||||
/// If the checkpoint associated with the specified identifier does not exist because the
|
||||
/// corresponding checkpoint would have been more than `max_checkpoints` deep, the removal is
|
||||
/// recorded as of the first existing checkpoint and the associated leaves will be pruned when
|
||||
/// that checkpoint is subsequently removed.
|
||||
/// corresponding checkpoint would have been more than `min_checkpoints_to_retain` deep, the
|
||||
/// removal is recorded as of the first existing checkpoint and the associated leaves will be
|
||||
/// pruned when that checkpoint is subsequently removed.
|
||||
///
|
||||
/// Returns `Ok(true)` if a mark was successfully removed from the leaf at the specified
|
||||
/// position, `Ok(false)` if the tree does not contain a leaf at the specified position or is
|
||||
|
@ -1253,7 +1260,7 @@ mod tests {
|
|||
};
|
||||
|
||||
use crate::{
|
||||
store::memory::MemoryShardStore,
|
||||
store::{memory::MemoryShardStore, ShardStore},
|
||||
testing::{
|
||||
arb_char_str, arb_shardtree, check_shard_sizes, check_shardtree_insertion,
|
||||
check_witness_with_pruned_subtrees,
|
||||
|
@ -1355,12 +1362,48 @@ mod tests {
|
|||
),
|
||||
Ok(()),
|
||||
);
|
||||
|
||||
// Append a leaf we want to retain
|
||||
assert_eq!(tree.append('e'.to_string(), Retention::Marked), Ok(()),);
|
||||
|
||||
// Now a few more leaves and then checkpoint
|
||||
for c in 'f'..='i' {
|
||||
tree.append(c.to_string(), Retention::Ephemeral).unwrap();
|
||||
}
|
||||
|
||||
// Checkpoint the tree. We'll want to retain this checkpoint.
|
||||
assert_eq!(tree.checkpoint(12), Ok(true));
|
||||
tree.store.ensure_retained(12).unwrap();
|
||||
|
||||
// Simulate adding yet another block
|
||||
for c in 'j'..='m' {
|
||||
tree.append(c.to_string(), Retention::Ephemeral).unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(tree.checkpoint(13), Ok(true));
|
||||
|
||||
// Witness `e` as of checkpoint 12
|
||||
let e_witness_12 = tree
|
||||
.witness_at_checkpoint_id(Position::from(4), &12)
|
||||
.unwrap();
|
||||
|
||||
// Now add some more checkpoints, which would ordinarily cause checkpoint 12
|
||||
// to be pruned (but will not, because we explicitly retained it.)
|
||||
for i in 14..24 {
|
||||
assert_eq!(tree.checkpoint(i), Ok(true));
|
||||
}
|
||||
|
||||
// Verify that we can still compute the same root
|
||||
assert_matches!(
|
||||
tree.witness_at_checkpoint_id(Position::from(4), &12),
|
||||
Ok(w) if w == e_witness_12
|
||||
);
|
||||
}
|
||||
|
||||
// Combined tree tests
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn new_combined_tree<H: Hashable + Ord + Clone + core::fmt::Debug>(
|
||||
max_checkpoints: usize,
|
||||
min_checkpoints_to_retain: usize,
|
||||
) -> CombinedTree<
|
||||
H,
|
||||
usize,
|
||||
|
@ -1368,8 +1411,8 @@ mod tests {
|
|||
ShardTree<MemoryShardStore<H, usize>, 4, 3>,
|
||||
> {
|
||||
CombinedTree::new(
|
||||
CompleteTree::new(max_checkpoints),
|
||||
ShardTree::new(MemoryShardStore::empty(), max_checkpoints),
|
||||
CompleteTree::new(min_checkpoints_to_retain),
|
||||
ShardTree::new(MemoryShardStore::empty(), min_checkpoints_to_retain),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,8 +23,8 @@ bitflags! {
|
|||
/// [`MARKED`]: RetentionFlags::MARKED
|
||||
const EPHEMERAL = 0b00000000;
|
||||
|
||||
/// A leaf with `CHECKPOINT` retention can be pruned when there are more than `max_checkpoints`
|
||||
/// additional checkpoint leaves, if it is not also a marked leaf.
|
||||
/// A leaf with `CHECKPOINT` retention can be pruned when there are more than
|
||||
/// `min_checkpoints_to_retain` additional checkpoints, if it is not also a marked leaf.
|
||||
const CHECKPOINT = 0b00000001;
|
||||
|
||||
/// A leaf with `MARKED` retention can be pruned only as a consequence of an explicit deletion
|
||||
|
@ -34,10 +34,12 @@ bitflags! {
|
|||
}
|
||||
|
||||
impl RetentionFlags {
|
||||
/// Returns whether the [`RetentionFlags::CHECKPOINT`] flag is set.
|
||||
pub fn is_checkpoint(&self) -> bool {
|
||||
(*self & RetentionFlags::CHECKPOINT) == RetentionFlags::CHECKPOINT
|
||||
}
|
||||
|
||||
/// Returns whether the [`RetentionFlags::MARKED`] flag is set.
|
||||
pub fn is_marked(&self) -> bool {
|
||||
(*self & RetentionFlags::MARKED) == RetentionFlags::MARKED
|
||||
}
|
||||
|
|
|
@ -93,6 +93,21 @@ pub trait ShardStore {
|
|||
checkpoint: Checkpoint,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Records the provided checkpoint ID as corresponding to a checkpoint to be retained across
|
||||
/// pruning operations.
|
||||
///
|
||||
/// Implementations of this method must add the provided checkpoint ID to the set of
|
||||
/// checkpoints to be retained even if no such checkpoint currently exists in the backing
|
||||
/// store.
|
||||
fn ensure_retained(&mut self, checkpoint_id: Self::CheckpointId) -> Result<(), Self::Error>;
|
||||
|
||||
/// Returns the number of checkpoints explicitly retained using [`ensure_retained`].
|
||||
fn ensured_retained_count(&self) -> Result<usize, Self::Error>;
|
||||
|
||||
/// Returns the set of identifiers for checkpoints that should be exempt from pruning
|
||||
/// operations.
|
||||
fn should_retain(&self, cid: &Self::CheckpointId) -> Result<bool, Self::Error>;
|
||||
|
||||
/// Returns the number of checkpoints maintained by the data store
|
||||
fn checkpoint_count(&self) -> Result<usize, Self::Error>;
|
||||
|
||||
|
@ -112,7 +127,7 @@ pub trait ShardStore {
|
|||
|
||||
/// Iterates in checkpoint ID order over the first `limit` checkpoints, applying the
|
||||
/// given callback to each.
|
||||
fn with_checkpoints<F>(&mut self, limit: usize, callback: F) -> Result<(), Self::Error>
|
||||
fn with_checkpoints<F>(&self, limit: usize, callback: F) -> Result<(), Self::Error>
|
||||
where
|
||||
F: FnMut(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>;
|
||||
|
||||
|
@ -165,6 +180,10 @@ impl<S: ShardStore> ShardStore for &mut S {
|
|||
S::get_shard_roots(*self)
|
||||
}
|
||||
|
||||
fn truncate(&mut self, from: Address) -> Result<(), Self::Error> {
|
||||
S::truncate(*self, from)
|
||||
}
|
||||
|
||||
fn get_cap(&self) -> Result<PrunableTree<Self::H>, Self::Error> {
|
||||
S::get_cap(*self)
|
||||
}
|
||||
|
@ -173,10 +192,6 @@ impl<S: ShardStore> ShardStore for &mut S {
|
|||
S::put_cap(*self, cap)
|
||||
}
|
||||
|
||||
fn truncate(&mut self, from: Address) -> Result<(), Self::Error> {
|
||||
S::truncate(*self, from)
|
||||
}
|
||||
|
||||
fn min_checkpoint_id(&self) -> Result<Option<Self::CheckpointId>, Self::Error> {
|
||||
S::min_checkpoint_id(self)
|
||||
}
|
||||
|
@ -193,6 +208,18 @@ impl<S: ShardStore> ShardStore for &mut S {
|
|||
S::add_checkpoint(self, checkpoint_id, checkpoint)
|
||||
}
|
||||
|
||||
fn ensure_retained(&mut self, checkpoint_id: Self::CheckpointId) -> Result<(), Self::Error> {
|
||||
S::ensure_retained(self, checkpoint_id)
|
||||
}
|
||||
|
||||
fn ensured_retained_count(&self) -> Result<usize, Self::Error> {
|
||||
S::ensured_retained_count(self)
|
||||
}
|
||||
|
||||
fn should_retain(&self, cid: &Self::CheckpointId) -> Result<bool, Self::Error> {
|
||||
S::should_retain(self, cid)
|
||||
}
|
||||
|
||||
fn checkpoint_count(&self) -> Result<usize, Self::Error> {
|
||||
S::checkpoint_count(self)
|
||||
}
|
||||
|
@ -211,11 +238,11 @@ impl<S: ShardStore> ShardStore for &mut S {
|
|||
S::get_checkpoint(self, checkpoint_id)
|
||||
}
|
||||
|
||||
fn with_checkpoints<F>(&mut self, limit: usize, callback: F) -> Result<(), Self::Error>
|
||||
fn with_checkpoints<F>(&self, limit: usize, mut callback: F) -> Result<(), Self::Error>
|
||||
where
|
||||
F: FnMut(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>,
|
||||
{
|
||||
S::with_checkpoints(self, limit, callback)
|
||||
S::with_checkpoints(self, limit, |cid, c| callback(cid, c))
|
||||
}
|
||||
|
||||
fn update_checkpoint_with<F>(
|
||||
|
|
|
@ -37,7 +37,7 @@ where
|
|||
S::CheckpointId: Clone + Ord,
|
||||
{
|
||||
/// Loads a `CachingShardStore` from the given backend.
|
||||
pub fn load(mut backend: S) -> Result<Self, S::Error> {
|
||||
pub fn load(backend: S) -> Result<Self, S::Error> {
|
||||
let mut cache = MemoryShardStore::empty();
|
||||
|
||||
for shard_root in backend.get_shard_roots()? {
|
||||
|
@ -94,6 +94,9 @@ where
|
|||
},
|
||||
)
|
||||
.unwrap();
|
||||
for cid in self.cache.checkpoints_to_retain() {
|
||||
self.backend.ensure_retained(cid.clone())?;
|
||||
}
|
||||
for (checkpoint_id, checkpoint) in checkpoints {
|
||||
self.backend.add_checkpoint(checkpoint_id, checkpoint)?;
|
||||
}
|
||||
|
@ -144,6 +147,14 @@ where
|
|||
self.cache.put_cap(cap)
|
||||
}
|
||||
|
||||
fn min_checkpoint_id(&self) -> Result<Option<Self::CheckpointId>, Self::Error> {
|
||||
self.cache.min_checkpoint_id()
|
||||
}
|
||||
|
||||
fn max_checkpoint_id(&self) -> Result<Option<Self::CheckpointId>, Self::Error> {
|
||||
self.cache.max_checkpoint_id()
|
||||
}
|
||||
|
||||
fn add_checkpoint(
|
||||
&mut self,
|
||||
checkpoint_id: Self::CheckpointId,
|
||||
|
@ -152,15 +163,20 @@ where
|
|||
self.cache.add_checkpoint(checkpoint_id, checkpoint)
|
||||
}
|
||||
|
||||
fn checkpoint_count(&self) -> Result<usize, Self::Error> {
|
||||
self.cache.checkpoint_count()
|
||||
fn ensure_retained(&mut self, checkpoint_id: Self::CheckpointId) -> Result<(), Self::Error> {
|
||||
self.cache.ensure_retained(checkpoint_id)
|
||||
}
|
||||
|
||||
fn get_checkpoint(
|
||||
&self,
|
||||
checkpoint_id: &Self::CheckpointId,
|
||||
) -> Result<Option<Checkpoint>, Self::Error> {
|
||||
self.cache.get_checkpoint(checkpoint_id)
|
||||
fn ensured_retained_count(&self) -> Result<usize, Self::Error> {
|
||||
self.cache.ensured_retained_count()
|
||||
}
|
||||
|
||||
fn should_retain(&self, cid: &Self::CheckpointId) -> Result<bool, Self::Error> {
|
||||
self.cache.should_retain(cid)
|
||||
}
|
||||
|
||||
fn checkpoint_count(&self) -> Result<usize, Self::Error> {
|
||||
self.cache.checkpoint_count()
|
||||
}
|
||||
|
||||
fn get_checkpoint_at_depth(
|
||||
|
@ -170,19 +186,19 @@ where
|
|||
self.cache.get_checkpoint_at_depth(checkpoint_depth)
|
||||
}
|
||||
|
||||
fn min_checkpoint_id(&self) -> Result<Option<Self::CheckpointId>, Self::Error> {
|
||||
self.cache.min_checkpoint_id()
|
||||
fn get_checkpoint(
|
||||
&self,
|
||||
checkpoint_id: &Self::CheckpointId,
|
||||
) -> Result<Option<Checkpoint>, Self::Error> {
|
||||
self.cache.get_checkpoint(checkpoint_id)
|
||||
}
|
||||
|
||||
fn max_checkpoint_id(&self) -> Result<Option<Self::CheckpointId>, Self::Error> {
|
||||
self.cache.max_checkpoint_id()
|
||||
}
|
||||
|
||||
fn with_checkpoints<F>(&mut self, limit: usize, callback: F) -> Result<(), Self::Error>
|
||||
fn with_checkpoints<F>(&self, limit: usize, mut callback: F) -> Result<(), Self::Error>
|
||||
where
|
||||
F: FnMut(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>,
|
||||
{
|
||||
self.cache.with_checkpoints(limit, callback)
|
||||
self.cache
|
||||
.with_checkpoints(limit, |cid, c| callback(cid, c))
|
||||
}
|
||||
|
||||
fn update_checkpoint_with<F>(
|
||||
|
@ -229,7 +245,7 @@ mod tests {
|
|||
};
|
||||
|
||||
fn check_equal(
|
||||
mut lhs: MemoryShardStore<String, u64>,
|
||||
lhs: MemoryShardStore<String, u64>,
|
||||
rhs: CachingShardStore<MemoryShardStore<String, u64>>,
|
||||
) {
|
||||
let rhs = rhs.flush().unwrap();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Implementation of an in-memory shard store with no persistence.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::convert::{Infallible, TryFrom};
|
||||
|
||||
use incrementalmerkletree::Address;
|
||||
|
@ -16,6 +16,7 @@ pub struct MemoryShardStore<H, C: Ord> {
|
|||
shards: Vec<LocatedPrunableTree<H>>,
|
||||
checkpoints: BTreeMap<C, Checkpoint>,
|
||||
cap: PrunableTree<H>,
|
||||
to_retain: BTreeSet<C>,
|
||||
}
|
||||
|
||||
impl<H, C: Ord> MemoryShardStore<H, C> {
|
||||
|
@ -25,8 +26,14 @@ impl<H, C: Ord> MemoryShardStore<H, C> {
|
|||
shards: vec![],
|
||||
checkpoints: BTreeMap::new(),
|
||||
cap: PrunableTree::empty(),
|
||||
to_retain: BTreeSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the set of checkpoints being explicitly retained.
|
||||
pub fn checkpoints_to_retain(&self) -> &BTreeSet<C> {
|
||||
&self.to_retain
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: Clone, C: Clone + Ord> ShardStore for MemoryShardStore<H, C> {
|
||||
|
@ -92,6 +99,19 @@ impl<H: Clone, C: Clone + Ord> ShardStore for MemoryShardStore<H, C> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_retained(&mut self, checkpoint_id: Self::CheckpointId) -> Result<(), Self::Error> {
|
||||
self.to_retain.insert(checkpoint_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensured_retained_count(&self) -> Result<usize, Self::Error> {
|
||||
Ok(self.to_retain.len())
|
||||
}
|
||||
|
||||
fn should_retain(&self, cid: &Self::CheckpointId) -> Result<bool, Self::Error> {
|
||||
Ok(self.to_retain.contains(cid))
|
||||
}
|
||||
|
||||
fn checkpoint_count(&self) -> Result<usize, Self::Error> {
|
||||
Ok(self.checkpoints.len())
|
||||
}
|
||||
|
@ -126,7 +146,7 @@ impl<H: Clone, C: Clone + Ord> ShardStore for MemoryShardStore<H, C> {
|
|||
Ok(self.checkpoints.keys().last().cloned())
|
||||
}
|
||||
|
||||
fn with_checkpoints<F>(&mut self, limit: usize, mut callback: F) -> Result<(), Self::Error>
|
||||
fn with_checkpoints<F>(&self, limit: usize, mut callback: F) -> Result<(), Self::Error>
|
||||
where
|
||||
F: FnMut(&C, &Checkpoint) -> Result<(), Self::Error>,
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue