Add checkpoint management APIs to `ShardStore`.

This commit is contained in:
Kris Nuttycombe 2023-05-03 13:35:40 -06:00
parent ed78bc2e56
commit e15440bd37
1 changed files with 350 additions and 161 deletions

View File

@ -1393,7 +1393,7 @@ impl Checkpoint {
/// A capability for storage of fragment subtrees of the `ShardTree` type. /// A capability for storage of fragment subtrees of the `ShardTree` type.
/// ///
/// All fragment subtrees must have roots at level `SHARD_HEIGHT - 1` /// All fragment subtrees must have roots at level `SHARD_HEIGHT - 1`
pub trait ShardStore<H> { pub trait ShardStore<H, C> {
type Error; type Error;
/// Returns the subtree at the given root address, if any such subtree exists. /// Returns the subtree at the given root address, if any such subtree exists.
@ -1418,20 +1418,149 @@ pub trait ShardStore<H> {
/// Implementations of this method MUST enforce the constraint that the root address /// Implementations of this method MUST enforce the constraint that the root address
/// provided has level `SHARD_HEIGHT - 1`. /// provided has level `SHARD_HEIGHT - 1`.
fn truncate(&mut self, from: Address) -> Result<(), Self::Error>; fn truncate(&mut self, from: Address) -> Result<(), Self::Error>;
// /// TODO: Add a tree that is used to cache the known roots of subtrees in the "cap" of nodes between
// /// `SHARD_HEIGHT` and `DEPTH` that are otherwise not directly represented in the tree. This
// /// cache will be automatically updated when computing roots and witnesses. Leaf nodes are empty
// /// because the annotation slot is consistently used to store the subtree hashes at each node.
// cap_cache: Tree<Option<Rc<H>>, ()>
/// Returns the identifier for the checkpoint with the lowest associated position value.
fn min_checkpoint_id(&self) -> Option<&C>;
/// Returns the identifier for the checkpoint with the highest associated position value.
fn max_checkpoint_id(&self) -> Option<&C>;
/// Adds a checkpoint to the data store.
fn add_checkpoint(
&mut self,
checkpoint_id: C,
checkpoint: Checkpoint,
) -> Result<(), Self::Error>;
/// Returns the number of checkpoints maintained by the data store
fn checkpoint_count(&self) -> Result<usize, Self::Error>;
/// Returns the position of the checkpoint, if any, along with the number of subsequent
/// checkpoints at the same position. Returns `None` if `checkpoint_depth == 0` or if
/// insufficient checkpoints exist to seek back to the requested depth.
fn get_checkpoint_at_depth(&self, checkpoint_depth: usize) -> Option<(&C, &Checkpoint)>;
/// Iterates in checkpoint ID order over the first `limit` checkpoints, applying the
/// given callback to each.
fn with_checkpoints<F>(&mut self, limit: usize, callback: F) -> Result<(), Self::Error>
where
F: FnMut(&C, &Checkpoint) -> Result<(), Self::Error>;
/// Update the checkpoint having the given identifier by mutating it with the provided
/// function, and persist the updated checkpoint to the data store.
///
/// Returns `Ok(true)` if the checkpoint was found, `Ok(false)` if no checkpoint with the
/// provided identifier exists in the data store, or an error if a storage error occurred.
fn update_checkpoint_with<F>(
&mut self,
checkpoint_id: &C,
update: F,
) -> Result<bool, Self::Error>
where
F: Fn(&mut Checkpoint) -> Result<(), Self::Error>;
/// Removes a checkpoint from the data store.
fn remove_checkpoint(&mut self, checkpoint_id: &C) -> Result<(), Self::Error>;
/// Removes checkpoints with identifiers greater than or equal to the given identifier
fn truncate_checkpoints(&mut self, checkpoint_id: &C) -> Result<(), Self::Error>;
} }
#[derive(Debug)] impl<H, C, S: ShardStore<H, C>> ShardStore<H, C> for &mut S {
pub struct MemoryShardStore<H> { type Error = S::Error;
shards: Vec<LocatedPrunableTree<H>>, fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> {
} S::get_shard(*self, shard_root)
}
impl<H> MemoryShardStore<H> { fn last_shard(&self) -> Option<&LocatedPrunableTree<H>> {
pub fn empty() -> Self { S::last_shard(*self)
Self { shards: vec![] } }
fn put_shard(&mut self, subtree: LocatedPrunableTree<H>) -> Result<(), Self::Error> {
S::put_shard(*self, subtree)
}
fn get_shard_roots(&self) -> Vec<Address> {
S::get_shard_roots(*self)
}
fn truncate(&mut self, from: Address) -> Result<(), Self::Error> {
S::truncate(*self, from)
}
fn min_checkpoint_id(&self) -> Option<&C> {
S::min_checkpoint_id(self)
}
fn max_checkpoint_id(&self) -> Option<&C> {
S::max_checkpoint_id(self)
}
fn add_checkpoint(
&mut self,
checkpoint_id: C,
checkpoint: Checkpoint,
) -> Result<(), Self::Error> {
S::add_checkpoint(self, checkpoint_id, checkpoint)
}
fn checkpoint_count(&self) -> Result<usize, Self::Error> {
S::checkpoint_count(self)
}
fn get_checkpoint_at_depth(&self, checkpoint_depth: usize) -> Option<(&C, &Checkpoint)> {
S::get_checkpoint_at_depth(self, checkpoint_depth)
}
fn with_checkpoints<F>(&mut self, limit: usize, callback: F) -> Result<(), Self::Error>
where
F: FnMut(&C, &Checkpoint) -> Result<(), Self::Error>,
{
S::with_checkpoints(self, limit, callback)
}
fn update_checkpoint_with<F>(
&mut self,
checkpoint_id: &C,
update: F,
) -> Result<bool, Self::Error>
where
F: Fn(&mut Checkpoint) -> Result<(), Self::Error>,
{
S::update_checkpoint_with(self, checkpoint_id, update)
}
fn remove_checkpoint(&mut self, checkpoint_id: &C) -> Result<(), Self::Error> {
S::remove_checkpoint(self, checkpoint_id)
}
fn truncate_checkpoints(&mut self, checkpoint_id: &C) -> Result<(), Self::Error> {
S::truncate_checkpoints(self, checkpoint_id)
} }
} }
impl<H> ShardStore<H> for MemoryShardStore<H> { #[derive(Debug)]
pub struct MemoryShardStore<H, C: Ord> {
shards: Vec<LocatedPrunableTree<H>>,
checkpoints: BTreeMap<C, Checkpoint>,
}
impl<H, C: Ord> MemoryShardStore<H, C> {
pub fn empty() -> Self {
Self {
shards: vec![],
checkpoints: BTreeMap::new(),
}
}
}
impl<H, C: Ord> ShardStore<H, C> for MemoryShardStore<H, C> {
type Error = Infallible; type Error = Infallible;
fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> { fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> {
@ -1470,6 +1599,72 @@ impl<H> ShardStore<H> for MemoryShardStore<H> {
self.shards.truncate(shard_idx); self.shards.truncate(shard_idx);
Ok(()) Ok(())
} }
fn add_checkpoint(
&mut self,
checkpoint_id: C,
checkpoint: Checkpoint,
) -> Result<(), Self::Error> {
self.checkpoints.insert(checkpoint_id, checkpoint);
Ok(())
}
fn checkpoint_count(&self) -> Result<usize, Self::Error> {
Ok(self.checkpoints.len())
}
fn get_checkpoint_at_depth(&self, checkpoint_depth: usize) -> Option<(&C, &Checkpoint)> {
if checkpoint_depth == 0 {
None
} else {
self.checkpoints.iter().rev().nth(checkpoint_depth - 1)
}
}
fn min_checkpoint_id(&self) -> Option<&C> {
self.checkpoints.keys().next()
}
fn max_checkpoint_id(&self) -> Option<&C> {
self.checkpoints.keys().last()
}
fn with_checkpoints<F>(&mut self, limit: usize, mut callback: F) -> Result<(), Self::Error>
where
F: FnMut(&C, &Checkpoint) -> Result<(), Self::Error>,
{
for (cid, checkpoint) in self.checkpoints.iter().take(limit) {
callback(cid, checkpoint)?
}
Ok(())
}
fn update_checkpoint_with<F>(
&mut self,
checkpoint_id: &C,
update: F,
) -> Result<bool, Self::Error>
where
F: Fn(&mut Checkpoint) -> Result<(), Self::Error>,
{
if let Some(c) = self.checkpoints.get_mut(checkpoint_id) {
update(c)?;
return Ok(true);
}
return Ok(false);
}
fn remove_checkpoint(&mut self, checkpoint_id: &C) -> Result<(), Self::Error> {
self.checkpoints.remove(checkpoint_id);
Ok(())
}
fn truncate_checkpoints(&mut self, checkpoint_id: &C) -> Result<(), Self::Error> {
self.checkpoints.split_off(checkpoint_id);
Ok(())
}
} }
/// A sparse binary Merkle tree of the specified depth, represented as an ordered collection of /// A sparse binary Merkle tree of the specified depth, represented as an ordered collection of
@ -1479,61 +1674,39 @@ impl<H> ShardStore<H> for MemoryShardStore<H> {
/// front of the tree, that are maintained such that it's possible to truncate nodes to the right /// front of the tree, that are maintained such that it's possible to truncate nodes to the right
/// of the specified position. /// of the specified position.
#[derive(Debug)] #[derive(Debug)]
pub struct ShardTree<H, C: Ord, S: ShardStore<H>, const DEPTH: u8, const SHARD_HEIGHT: u8> { pub struct ShardTree<H, C, S: ShardStore<H, C>, const DEPTH: u8, const SHARD_HEIGHT: u8> {
/// The vector of tree shards. /// The vector of tree shards.
store: S, store: S,
/// The maximum number of checkpoints to retain before pruning. /// The maximum number of checkpoints to retain before pruning.
max_checkpoints: usize, max_checkpoints: usize,
/// An ordered map from checkpoint identifier to checkpoint.
checkpoints: BTreeMap<C, Checkpoint>,
// /// TODO: Add a tree that is used to cache the known roots of subtrees in the "cap" of nodes between
// /// `SHARD_HEIGHT` and `DEPTH` that are otherwise not directly represented in the tree. This
// /// cache will be automatically updated when computing roots and witnesses. Leaf nodes are empty
// /// because the annotation slot is consistently used to store the subtree hashes at each node.
// cap_cache: Tree<Option<Rc<H>>, ()>
_hash_type: PhantomData<H>, _hash_type: PhantomData<H>,
} _checkpoint_id_type: PhantomData<C>,
impl<H, S: ShardStore<H>> ShardStore<H> for &mut S {
type Error = S::Error;
fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> {
S::get_shard(*self, shard_root)
}
fn last_shard(&self) -> Option<&LocatedPrunableTree<H>> {
S::last_shard(*self)
}
fn put_shard(&mut self, subtree: LocatedPrunableTree<H>) -> Result<(), Self::Error> {
S::put_shard(*self, subtree)
}
fn get_shard_roots(&self) -> Vec<Address> {
S::get_shard_roots(*self)
}
fn truncate(&mut self, from: Address) -> Result<(), Self::Error> {
S::truncate(*self, from)
}
} }
impl< impl<
H: Hashable + Clone + PartialEq, H: Hashable + Clone + PartialEq,
C: Clone + Ord, C: Clone + Ord,
S: ShardStore<H>, S: ShardStore<H, C>,
const DEPTH: u8, const DEPTH: u8,
const SHARD_HEIGHT: u8, const SHARD_HEIGHT: u8,
> ShardTree<H, C, S, DEPTH, SHARD_HEIGHT> > ShardTree<H, C, S, DEPTH, SHARD_HEIGHT>
{ {
/// Creates a new empty tree. /// Creates a new empty tree.
pub fn new(store: S, max_checkpoints: usize, initial_checkpoint_id: C) -> Self { pub fn new(
Self { store: S,
max_checkpoints: usize,
initial_checkpoint_id: C,
) -> Result<Self, S::Error> {
let mut result = Self {
store, store,
max_checkpoints, max_checkpoints,
checkpoints: BTreeMap::from([(initial_checkpoint_id, Checkpoint::tree_empty())]),
//cap_cache: Tree(None, ())
_hash_type: PhantomData, _hash_type: PhantomData,
} _checkpoint_id_type: PhantomData,
};
result
.store
.add_checkpoint(initial_checkpoint_id, Checkpoint::tree_empty())?;
Ok(result)
} }
/// Returns the root address of the tree. /// Returns the root address of the tree.
@ -1547,11 +1720,6 @@ impl<
Level::from(SHARD_HEIGHT - 1) Level::from(SHARD_HEIGHT - 1)
} }
/// Returns the position and checkpoint count for each checkpointed position in the tree.
pub fn checkpoints(&self) -> &BTreeMap<C, Checkpoint> {
&self.checkpoints
}
/// Returns the leaf value at the specified position, if it is a marked leaf. /// Returns the leaf value at the specified position, if it is a marked leaf.
pub fn get_marked_leaf(&self, position: Position) -> Option<&H> { pub fn get_marked_leaf(&self, position: Position) -> Option<&H> {
self.store self.store
@ -1624,9 +1792,12 @@ impl<
&mut self, &mut self,
value: H, value: H,
retention: Retention<C>, retention: Retention<C>,
) -> Result<(), InsertionError<S::Error>> { ) -> Result<(), InsertionError<S::Error>>
where
S::Error: Debug,
{
if let Retention::Checkpoint { id, .. } = &retention { if let Retention::Checkpoint { id, .. } = &retention {
if self.checkpoints.keys().last() >= Some(id) { if self.store.max_checkpoint_id() >= Some(id) {
return Err(InsertionError::CheckpointOutOfOrder); return Err(InsertionError::CheckpointOutOfOrder);
} }
} }
@ -1653,8 +1824,9 @@ impl<
.put_shard(append_result) .put_shard(append_result)
.map_err(InsertionError::Storage)?; .map_err(InsertionError::Storage)?;
if let Some(c) = checkpoint_id { if let Some(c) = checkpoint_id {
self.checkpoints self.store
.insert(c, Checkpoint::at_position(position)); .add_checkpoint(c, Checkpoint::at_position(position))
.map_err(InsertionError::Storage)?;
} }
self.prune_excess_checkpoints() self.prune_excess_checkpoints()
@ -1682,7 +1854,10 @@ impl<
&mut self, &mut self,
mut start: Position, mut start: Position,
values: I, values: I,
) -> Result<Option<(Position, Vec<IncompleteAt>)>, InsertionError<S::Error>> { ) -> Result<Option<(Position, Vec<IncompleteAt>)>, InsertionError<S::Error>>
where
S::Error: Debug,
{
let mut values = values.peekable(); let mut values = values.peekable();
let mut subtree_root_addr = Address::above_position(Self::subtree_level(), start); let mut subtree_root_addr = Address::above_position(Self::subtree_level(), start);
let mut max_insert_position = None; let mut max_insert_position = None;
@ -1702,8 +1877,9 @@ impl<
.put_shard(res.subtree) .put_shard(res.subtree)
.map_err(InsertionError::Storage)?; .map_err(InsertionError::Storage)?;
for (id, position) in res.checkpoints.into_iter() { for (id, position) in res.checkpoints.into_iter() {
self.checkpoints self.store
.insert(id, Checkpoint::at_position(position)); .add_checkpoint(id, Checkpoint::at_position(position))
.map_err(InsertionError::Storage)?;
} }
values = res.remainder; values = res.remainder;
@ -1718,7 +1894,6 @@ impl<
self.prune_excess_checkpoints() self.prune_excess_checkpoints()
.map_err(InsertionError::Storage)?; .map_err(InsertionError::Storage)?;
Ok(max_insert_position.map(|p| (p, all_incomplete))) Ok(max_insert_position.map(|p| (p, all_incomplete)))
} }
@ -1747,7 +1922,10 @@ impl<
} }
/// Adds a checkpoint at the rightmost leaf state of the tree. /// Adds a checkpoint at the rightmost leaf state of the tree.
pub fn checkpoint(&mut self, checkpoint_id: C) -> bool { pub fn checkpoint(&mut self, checkpoint_id: C) -> Result<bool, InsertionError<S::Error>>
where
S::Error: Debug,
{
fn go<H: Hashable + Clone + PartialEq>( fn go<H: Hashable + Clone + PartialEq>(
root_addr: Address, root_addr: Address,
root: &PrunableTree<H>, root: &PrunableTree<H>,
@ -1793,8 +1971,8 @@ impl<
} }
// checkpoint identifiers at the tip must be in increasing order // checkpoint identifiers at the tip must be in increasing order
if self.checkpoints.keys().last() >= Some(&checkpoint_id) { if self.store.max_checkpoint_id() >= Some(&checkpoint_id) {
return false; return Ok(false);
} }
// Search backward from the end of the subtrees iter to find a non-empty subtree. // Search backward from the end of the subtrees iter to find a non-empty subtree.
@ -1814,67 +1992,75 @@ impl<
}) })
.is_err() .is_err()
{ {
return false; return Ok(false);
} }
self.checkpoints self.store
.insert(checkpoint_id, Checkpoint::at_position(checkpoint_position)); .add_checkpoint(checkpoint_id, Checkpoint::at_position(checkpoint_position))
.map_err(InsertionError::Storage)?;
// early return once we've updated the tree state // early return once we've updated the tree state
return self self.prune_excess_checkpoints()
.prune_excess_checkpoints() .map_err(InsertionError::Storage)?;
.map_err(InsertionError::Storage) return Ok(true);
.is_ok();
} }
} }
self.checkpoints self.store
.insert(checkpoint_id, Checkpoint::tree_empty()); .add_checkpoint(checkpoint_id, Checkpoint::tree_empty())
.map_err(InsertionError::Storage)?;
// TODO: it should not be necessary to do this on every checkpoint, // TODO: it should not be necessary to do this on every checkpoint,
// but currently that's how the reference tree behaves so we're maintaining // but currently that's how the reference tree behaves so we're maintaining
// those semantics for test compatibility. // those semantics for test compatibility.
self.prune_excess_checkpoints() self.prune_excess_checkpoints()
.map_err(InsertionError::Storage) .map_err(InsertionError::Storage)?;
.is_ok() Ok(true)
} }
fn prune_excess_checkpoints(&mut self) -> Result<(), S::Error> { fn prune_excess_checkpoints(&mut self) -> Result<(), S::Error>
if self.checkpoints.len() > self.max_checkpoints { where
S::Error: Debug,
{
let checkpoint_count = self.store.checkpoint_count()?;
if checkpoint_count > self.max_checkpoints {
// Batch removals by subtree & create a list of the checkpoint identifiers that // Batch removals by subtree & create a list of the checkpoint identifiers that
// will be removed from the checkpoints map. // will be removed from the checkpoints map.
let mut checkpoints_to_delete = vec![]; let mut checkpoints_to_delete = vec![];
let mut clear_positions: BTreeMap<Address, BTreeMap<Position, RetentionFlags>> = let mut clear_positions: BTreeMap<Address, BTreeMap<Position, RetentionFlags>> =
BTreeMap::new(); BTreeMap::new();
for (cid, checkpoint) in self self.store
.checkpoints .with_checkpoints(
.iter() checkpoint_count - self.max_checkpoints,
.take(self.checkpoints.len() - self.max_checkpoints) |cid, checkpoint| {
{ checkpoints_to_delete.push(cid.clone());
checkpoints_to_delete.push(cid.clone());
let mut clear_at = |pos, flags_to_clear| { let mut clear_at = |pos, flags_to_clear| {
let subtree_addr = Address::above_position(Self::subtree_level(), pos); let subtree_addr = Address::above_position(Self::subtree_level(), pos);
clear_positions clear_positions
.entry(subtree_addr) .entry(subtree_addr)
.and_modify(|to_clear| { .and_modify(|to_clear| {
to_clear to_clear
.entry(pos) .entry(pos)
.and_modify(|flags| *flags |= flags_to_clear) .and_modify(|flags| *flags |= flags_to_clear)
.or_insert(flags_to_clear); .or_insert(flags_to_clear);
}) })
.or_insert_with(|| BTreeMap::from([(pos, flags_to_clear)])); .or_insert_with(|| BTreeMap::from([(pos, flags_to_clear)]));
}; };
// clear the checkpoint leaf // clear the checkpoint leaf
if let TreeState::AtPosition(pos) = checkpoint.tree_state { if let TreeState::AtPosition(pos) = checkpoint.tree_state {
clear_at(pos, RetentionFlags::CHECKPOINT) clear_at(pos, RetentionFlags::CHECKPOINT)
} }
// clear the leaves that have been marked for removal // clear the leaves that have been marked for removal
for unmark_pos in checkpoint.marks_removed.iter() { for unmark_pos in checkpoint.marks_removed.iter() {
clear_at(*unmark_pos, RetentionFlags::MARKED) clear_at(*unmark_pos, RetentionFlags::MARKED)
} }
}
Ok(())
},
)
.expect("The provided function is infallible.");
// Prune each affected subtree // Prune each affected subtree
for (subtree_addr, positions) in clear_positions.into_iter() { for (subtree_addr, positions) in clear_positions.into_iter() {
@ -1889,7 +2075,7 @@ impl<
// Now that the leaves have been pruned, actually remove the checkpoints // Now that the leaves have been pruned, actually remove the checkpoints
for c in checkpoints_to_delete { for c in checkpoints_to_delete {
self.checkpoints.remove(&c); self.store.remove_checkpoint(&c)?;
} }
} }
@ -1901,23 +2087,21 @@ impl<
/// This will also discard all checkpoints with depth <= the specified depth. Returns `true` /// This will also discard all checkpoints with depth <= the specified depth. Returns `true`
/// if the truncation succeeds or has no effect, or `false` if no checkpoint exists at the /// if the truncation succeeds or has no effect, or `false` if no checkpoint exists at the
/// specified depth. /// specified depth.
pub fn truncate_removing_checkpoint(&mut self, checkpoint_depth: usize) -> bool { pub fn truncate_removing_checkpoint(
&mut self,
checkpoint_depth: usize,
) -> Result<bool, S::Error> {
if checkpoint_depth == 0 { if checkpoint_depth == 0 {
true Ok(true)
} else if self.checkpoints.len() > 1 { } else if self.store.checkpoint_count()? > 1 {
match self.checkpoint_at_depth(checkpoint_depth) { Ok(match self.store.get_checkpoint_at_depth(checkpoint_depth) {
Some((checkpoint_id, c)) => { Some((checkpoint_id, c)) => {
let checkpoint_id = checkpoint_id.clone(); let checkpoint_id = checkpoint_id.clone();
match c.tree_state { match c.tree_state {
TreeState::Empty => { TreeState::Empty => {
if (self self.store
.store .truncate(Address::from_parts(Self::subtree_level(), 0))?;
.truncate(Address::from_parts(Self::subtree_level(), 0))) self.store.truncate_checkpoints(&checkpoint_id)?;
.is_err()
{
return false;
}
self.checkpoints.split_off(&checkpoint_id);
true true
} }
TreeState::AtPosition(position) => { TreeState::AtPosition(position) => {
@ -1927,16 +2111,13 @@ impl<
.store .store
.get_shard(subtree_addr) .get_shard(subtree_addr)
.and_then(|s| s.truncate_to_position(position)); .and_then(|s| s.truncate_to_position(position));
match replacement { match replacement {
Some(truncated) => { Some(truncated) => {
if self.store.truncate(subtree_addr).is_err() self.store.truncate(subtree_addr)?;
|| self.store.put_shard(truncated).is_err() self.store.put_shard(truncated)?;
{ self.store.truncate_checkpoints(&checkpoint_id)?;
false true
} else {
self.checkpoints.split_off(&checkpoint_id);
true
}
} }
None => false, None => false,
} }
@ -1944,10 +2125,9 @@ impl<
} }
} }
None => false, None => false,
} })
} else { } else {
// do not remove the first checkpoint. Ok(false)
false
} }
} }
@ -2060,17 +2240,6 @@ impl<
} }
} }
/// Returns the position of the checkpoint, if any, along with the number of subsequent
/// checkpoints at the same position. Returns `None` if `checkpoint_depth == 0` or if
/// insufficient checkpoints exist to seek back to the requested depth.
pub fn checkpoint_at_depth(&self, checkpoint_depth: usize) -> Option<(&C, &Checkpoint)> {
if checkpoint_depth == 0 {
None
} else {
self.checkpoints.iter().rev().nth(checkpoint_depth - 1)
}
}
/// Returns the position of the rightmost leaf inserted as of the given checkpoint. /// Returns the position of the rightmost leaf inserted as of the given checkpoint.
/// ///
/// Returns the maximum leaf position if `checkpoint_depth == 0` (or `Ok(None)` in this /// Returns the maximum leaf position if `checkpoint_depth == 0` (or `Ok(None)` in this
@ -2088,7 +2257,7 @@ impl<
// just store it directly. // just store it directly.
Ok(self.store.last_shard().and_then(|t| t.max_position())) Ok(self.store.last_shard().and_then(|t| t.max_position()))
} else { } else {
match self.checkpoint_at_depth(checkpoint_depth) { match self.store.get_checkpoint_at_depth(checkpoint_depth) {
Some((_, c)) => Ok(c.position()), Some((_, c)) => Ok(c.position()),
None => { None => {
// There is no checkpoint at the specified depth, so we report it as pruned. // There is no checkpoint at the specified depth, so we report it as pruned.
@ -2156,20 +2325,33 @@ impl<
/// corresponding checkpoint would have been more than `max_checkpoints` deep, the removal /// corresponding checkpoint would have been more than `max_checkpoints` deep, the removal
/// is recorded as of the first existing checkpoint and the associated leaves will be pruned /// is recorded as of the first existing checkpoint and the associated leaves will be pruned
/// when that checkpoint is subsequently removed. /// when that checkpoint is subsequently removed.
pub fn remove_mark(&mut self, position: Position, as_of_checkpoint: &C) -> bool { pub fn remove_mark(
&mut self,
position: Position,
as_of_checkpoint: &C,
) -> Result<bool, S::Error> {
if self.get_marked_leaf(position).is_some() { if self.get_marked_leaf(position).is_some() {
if let Some(checkpoint) = self.checkpoints.get_mut(as_of_checkpoint) { if self
checkpoint.marks_removed.insert(position); .store
return true; .update_checkpoint_with(as_of_checkpoint, |checkpoint| {
checkpoint.marks_removed.insert(position);
Ok(())
})?
{
return Ok(true);
} }
if let Some((_, checkpoint)) = self.checkpoints.iter_mut().next() { if let Some(cid) = self.store.min_checkpoint_id().cloned() {
checkpoint.marks_removed.insert(position); if self.store.update_checkpoint_with(&cid, |checkpoint| {
return true; checkpoint.marks_removed.insert(position);
Ok(())
})? {
return Ok(true);
}
} }
} }
false Ok(false)
} }
} }
@ -2569,8 +2751,8 @@ mod tests {
#[test] #[test]
fn shardtree_insertion() { fn shardtree_insertion() {
let mut tree: ShardTree<String, usize, MemoryShardStore<String>, 4, 3> = let mut tree: ShardTree<String, usize, MemoryShardStore<String, usize>, 4, 3> =
ShardTree::new(MemoryShardStore::empty(), 100, 0); ShardTree::new(MemoryShardStore::empty(), 100, 0).unwrap();
assert_matches!( assert_matches!(
tree.batch_insert( tree.batch_insert(
Position::from(1), Position::from(1),
@ -2655,16 +2837,18 @@ mod tests {
] ]
); );
assert!(tree.truncate_removing_checkpoint(1)); assert_matches!(tree.truncate_removing_checkpoint(1), Ok(true));
} }
impl< impl<
H: Hashable + Ord + Clone, H: Hashable + Ord + Clone,
C: Clone + Ord + core::fmt::Debug, C: Clone + Ord + core::fmt::Debug,
S: ShardStore<H>, S: ShardStore<H, C>,
const DEPTH: u8, const DEPTH: u8,
const SHARD_HEIGHT: u8, const SHARD_HEIGHT: u8,
> testing::Tree<H, C> for ShardTree<H, C, S, DEPTH, SHARD_HEIGHT> > testing::Tree<H, C> for ShardTree<H, C, S, DEPTH, SHARD_HEIGHT>
where
S::Error: core::fmt::Debug,
{ {
fn depth(&self) -> u8 { fn depth(&self) -> u8 {
DEPTH DEPTH
@ -2697,74 +2881,79 @@ mod tests {
} }
fn remove_mark(&mut self, position: Position) -> bool { fn remove_mark(&mut self, position: Position) -> bool {
if let Some(c) = self.checkpoints.iter().rev().map(|(c, _)| c.clone()).next() { if let Some(c) = self.store.max_checkpoint_id().cloned() {
ShardTree::remove_mark(self, position, &c) ShardTree::remove_mark(self, position, &c).unwrap()
} else { } else {
false false
} }
} }
fn checkpoint(&mut self, checkpoint_id: C) -> bool { fn checkpoint(&mut self, checkpoint_id: C) -> bool {
ShardTree::checkpoint(self, checkpoint_id) ShardTree::checkpoint(self, checkpoint_id).unwrap()
} }
fn rewind(&mut self) -> bool { fn rewind(&mut self) -> bool {
ShardTree::truncate_removing_checkpoint(self, 1) ShardTree::truncate_removing_checkpoint(self, 1).unwrap()
} }
} }
#[test] #[test]
fn append() { fn append() {
check_append(|m| { check_append(|m| {
ShardTree::<String, usize, MemoryShardStore<String>, 4, 3>::new( ShardTree::<String, usize, MemoryShardStore<String, usize>, 4, 3>::new(
MemoryShardStore::empty(), MemoryShardStore::empty(),
m, m,
0, 0,
) )
.unwrap()
}); });
} }
#[test] #[test]
fn root_hashes() { fn root_hashes() {
check_root_hashes(|m| { check_root_hashes(|m| {
ShardTree::<String, usize, MemoryShardStore<String>, 4, 3>::new( ShardTree::<String, usize, MemoryShardStore<String, usize>, 4, 3>::new(
MemoryShardStore::empty(), MemoryShardStore::empty(),
m, m,
0, 0,
) )
.unwrap()
}); });
} }
#[test] #[test]
fn witnesses() { fn witnesses() {
check_witnesses(|m| { check_witnesses(|m| {
ShardTree::<String, usize, MemoryShardStore<String>, 4, 3>::new( ShardTree::<String, usize, MemoryShardStore<String, usize>, 4, 3>::new(
MemoryShardStore::empty(), MemoryShardStore::empty(),
m, m,
0, 0,
) )
.unwrap()
}); });
} }
#[test] #[test]
fn checkpoint_rewind() { fn checkpoint_rewind() {
check_checkpoint_rewind(|m| { check_checkpoint_rewind(|m| {
ShardTree::<String, usize, MemoryShardStore<String>, 4, 3>::new( ShardTree::<String, usize, MemoryShardStore<String, usize>, 4, 3>::new(
MemoryShardStore::empty(), MemoryShardStore::empty(),
m, m,
0, 0,
) )
.unwrap()
}); });
} }
#[test] #[test]
fn rewind_remove_mark() { fn rewind_remove_mark() {
check_rewind_remove_mark(|m| { check_rewind_remove_mark(|m| {
ShardTree::<String, usize, MemoryShardStore<String>, 4, 3>::new( ShardTree::<String, usize, MemoryShardStore<String, usize>, 4, 3>::new(
MemoryShardStore::empty(), MemoryShardStore::empty(),
m, m,
0, 0,
) )
.unwrap()
}); });
} }
@ -2776,11 +2965,11 @@ mod tests {
H, H,
usize, usize,
CompleteTree<H, usize, 4>, CompleteTree<H, usize, 4>,
ShardTree<H, usize, MemoryShardStore<H>, 4, 3>, ShardTree<H, usize, MemoryShardStore<H, usize>, 4, 3>,
> { > {
CombinedTree::new( CombinedTree::new(
CompleteTree::new(max_checkpoints, 0), CompleteTree::new(max_checkpoints, 0),
ShardTree::new(MemoryShardStore::empty(), max_checkpoints, 0), ShardTree::new(MemoryShardStore::empty(), max_checkpoints, 0).unwrap(),
) )
} }