Invert the relationship between shardtree errors and storage errors.

This commit is contained in:
Kris Nuttycombe 2023-05-05 19:01:05 -06:00
parent 0e502e3832
commit 247b38f336
1 changed files with 80 additions and 104 deletions

View File

@ -1,6 +1,6 @@
use bitflags::bitflags; use bitflags::bitflags;
use core::convert::{Infallible, TryFrom}; use core::convert::TryFrom;
use core::fmt::{self, Debug, Display}; use core::fmt::{self, Debug};
use core::marker::PhantomData; use core::marker::PhantomData;
use core::ops::{Deref, Range}; use core::ops::{Deref, Range};
use either::Either; use either::Either;
@ -638,7 +638,7 @@ pub struct BatchInsertionResult<H, C: Ord, I: Iterator<Item = (H, Retention<C>)>
/// An error prevented the insertion of values into the subtree. /// An error prevented the insertion of values into the subtree.
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub enum InsertionError<S> { pub enum InsertionError {
/// The caller attempted to insert a subtree into a tree that does not contain /// The caller attempted to insert a subtree into a tree that does not contain
/// the subtree's root address. /// the subtree's root address.
NotContained(Address), NotContained(Address),
@ -653,11 +653,9 @@ pub enum InsertionError<S> {
CheckpointOutOfOrder, CheckpointOutOfOrder,
/// An append operation has exceeded the capacity of the tree. /// An append operation has exceeded the capacity of the tree.
TreeFull, TreeFull,
/// An error was produced by the underlying [`ShardStore`]
Storage(S),
} }
impl<S: Display> fmt::Display for InsertionError<S> { impl fmt::Display for InsertionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &self { match &self {
InsertionError::NotContained(addr) => { InsertionError::NotContained(addr) => {
@ -679,11 +677,6 @@ impl<S: Display> fmt::Display for InsertionError<S> {
write!(f, "Cannot append out-of-order checkpoint identifier.") write!(f, "Cannot append out-of-order checkpoint identifier.")
} }
InsertionError::TreeFull => write!(f, "Note commitment tree is full."), InsertionError::TreeFull => write!(f, "Note commitment tree is full."),
InsertionError::Storage(e) => write!(
f,
"An error occurred persisting tree data to storage: {}",
e
),
} }
} }
} }
@ -902,21 +895,21 @@ impl<H: Hashable + Clone + PartialEq> LocatedPrunableTree<H> {
/// error if the specified subtree's root address is not in the range of valid descendants of /// error if the specified subtree's root address is not in the range of valid descendants of
/// the root node of this tree or if the insertion would result in a conflict between computed /// the root node of this tree or if the insertion would result in a conflict between computed
/// root hashes of complete subtrees. /// root hashes of complete subtrees.
pub fn insert_subtree<E>( pub fn insert_subtree(
&self, &self,
subtree: Self, subtree: Self,
contains_marked: bool, contains_marked: bool,
) -> Result<(Self, Vec<IncompleteAt>), InsertionError<E>> { ) -> Result<(Self, Vec<IncompleteAt>), InsertionError> {
// A function to recursively dig into the tree, creating a path downward and introducing // A function to recursively dig into the tree, creating a path downward and introducing
// empty nodes as necessary until we can insert the provided subtree. // empty nodes as necessary until we can insert the provided subtree.
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
fn go<H: Hashable + Clone + PartialEq, E>( fn go<H: Hashable + Clone + PartialEq>(
root_addr: Address, root_addr: Address,
into: &PrunableTree<H>, into: &PrunableTree<H>,
subtree: LocatedPrunableTree<H>, subtree: LocatedPrunableTree<H>,
is_complete: bool, is_complete: bool,
contains_marked: bool, contains_marked: bool,
) -> Result<(PrunableTree<H>, Vec<IncompleteAt>), InsertionError<E>> { ) -> Result<(PrunableTree<H>, Vec<IncompleteAt>), InsertionError> {
// In the case that we are replacing a node entirely, we need to extend the // In the case that we are replacing a node entirely, we need to extend the
// subtree up to the level of the node being replaced, adding Nil siblings // subtree up to the level of the node being replaced, adding Nil siblings
// and recording the presence of those incomplete nodes when necessary // and recording the presence of those incomplete nodes when necessary
@ -1045,11 +1038,11 @@ impl<H: Hashable + Clone + PartialEq> LocatedPrunableTree<H> {
/// Prefer to use [`Self::batch_append`] or [`Self::batch_insert`] when appending multiple /// Prefer to use [`Self::batch_append`] or [`Self::batch_insert`] when appending multiple
/// values, as these operations require fewer traversals of the tree than are necessary when /// values, as these operations require fewer traversals of the tree than are necessary when
/// performing multiple sequential calls to [`Self::append`]. /// performing multiple sequential calls to [`Self::append`].
pub fn append<C: Clone + Ord, E>( pub fn append<C: Clone + Ord>(
&self, &self,
value: H, value: H,
retention: Retention<C>, retention: Retention<C>,
) -> Result<(Self, Position, Option<C>), InsertionError<E>> { ) -> Result<(Self, Position, Option<C>), InsertionError> {
let checkpoint_id = if let Retention::Checkpoint { id, .. } = &retention { let checkpoint_id = if let Retention::Checkpoint { id, .. } = &retention {
Some(id.clone()) Some(id.clone())
} else { } else {
@ -1073,10 +1066,10 @@ impl<H: Hashable + Clone + PartialEq> LocatedPrunableTree<H> {
/// Returns an error if the tree is full. If the position at the end of the iterator is outside /// Returns an error if the tree is full. If the position at the end of the iterator is outside
/// of the subtree's range, the unconsumed part of the iterator will be returned as part of /// of the subtree's range, the unconsumed part of the iterator will be returned as part of
/// the result. /// the result.
pub fn batch_append<C: Clone + Ord, I: Iterator<Item = (H, Retention<C>)>, E>( pub fn batch_append<C: Clone + Ord, I: Iterator<Item = (H, Retention<C>)>>(
&self, &self,
values: I, values: I,
) -> Result<Option<BatchInsertionResult<H, C, I>>, InsertionError<E>> { ) -> Result<Option<BatchInsertionResult<H, C, I>>, InsertionError> {
let append_position = self let append_position = self
.max_position() .max_position()
.map(|p| p + 1) .map(|p| p + 1)
@ -1296,11 +1289,11 @@ impl<H: Hashable + Clone + PartialEq> LocatedPrunableTree<H> {
/// Returns `Ok(None)` if the provided iterator is empty, `Ok(Some<BatchInsertionResult>)` if /// Returns `Ok(None)` if the provided iterator is empty, `Ok(Some<BatchInsertionResult>)` if
/// values were successfully inserted, or an error if the start position provided is outside /// values were successfully inserted, or an error if the start position provided is outside
/// of this tree's position range or if a conflict with an existing subtree root is detected. /// of this tree's position range or if a conflict with an existing subtree root is detected.
pub fn batch_insert<C: Clone + Ord, I: Iterator<Item = (H, Retention<C>)>, E>( pub fn batch_insert<C: Clone + Ord, I: Iterator<Item = (H, Retention<C>)>>(
&self, &self,
start: Position, start: Position,
values: I, values: I,
) -> Result<Option<BatchInsertionResult<H, C, I>>, InsertionError<E>> { ) -> Result<Option<BatchInsertionResult<H, C, I>>, InsertionError> {
let subtree_range = self.root_addr.position_range(); let subtree_range = self.root_addr.position_range();
let contains_start = subtree_range.contains(&start); let contains_start = subtree_range.contains(&start);
if contains_start { if contains_start {
@ -1505,6 +1498,7 @@ pub trait ShardStore<H, C> {
impl<H, C, S: ShardStore<H, C>> ShardStore<H, C> for &mut S { impl<H, C, S: ShardStore<H, C>> ShardStore<H, C> for &mut S {
type Error = S::Error; type Error = S::Error;
fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> { fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> {
S::get_shard(*self, shard_root) S::get_shard(*self, shard_root)
} }
@ -1592,7 +1586,7 @@ impl<H, C: Ord> MemoryShardStore<H, C> {
} }
impl<H, C: Ord> ShardStore<H, C> for MemoryShardStore<H, C> { impl<H, C: Ord> ShardStore<H, C> for MemoryShardStore<H, C> {
type Error = Infallible; type Error = InsertionError;
fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> { fn get_shard(&self, shard_root: Address) -> Option<&LocatedPrunableTree<H>> {
let shard_idx = let shard_idx =
@ -1788,7 +1782,10 @@ impl<
/// already exists at this address, its root will be annotated with the specified hash value. /// already exists at this address, its root will be annotated with the specified hash value.
/// ///
/// This will return an error if the specified hash conflicts with any existing annotation. /// This will return an error if the specified hash conflicts with any existing annotation.
pub fn put_root(&mut self, addr: Address, value: H) -> Result<(), InsertionError<S::Error>> { pub fn put_root(&mut self, addr: Address, value: H) -> Result<(), S::Error>
where
S::Error: From<InsertionError>,
{
let updated_subtree = match self.store.get_shard(addr) { let updated_subtree = match self.store.get_shard(addr) {
Some(s) if !s.root.is_nil() => s.root.node_value().map_or_else( Some(s) if !s.root.is_nil() => s.root.node_value().map_or_else(
|| { || {
@ -1819,7 +1816,7 @@ impl<
}?; }?;
if let Some(s) = updated_subtree { if let Some(s) = updated_subtree {
self.store.put_shard(s).map_err(InsertionError::Storage)?; self.store.put_shard(s)?;
} }
Ok(()) Ok(())
@ -1830,17 +1827,13 @@ impl<
/// Prefer to use [`Self::batch_insert`] when appending multiple values, as these operations /// Prefer to use [`Self::batch_insert`] when appending multiple values, as these operations
/// require fewer traversals of the tree than are necessary when performing multiple sequential /// require fewer traversals of the tree than are necessary when performing multiple sequential
/// calls to [`Self::append`]. /// calls to [`Self::append`].
pub fn append( pub fn append(&mut self, value: H, retention: Retention<C>) -> Result<(), S::Error>
&mut self,
value: H,
retention: Retention<C>,
) -> Result<(), InsertionError<S::Error>>
where where
S::Error: Debug, S::Error: From<InsertionError>,
{ {
if let Retention::Checkpoint { id, .. } = &retention { if let Retention::Checkpoint { id, .. } = &retention {
if self.store.max_checkpoint_id() >= Some(id) { if self.store.max_checkpoint_id() >= Some(id) {
return Err(InsertionError::CheckpointOutOfOrder); return Err(InsertionError::CheckpointOutOfOrder.into());
} }
} }
@ -1850,7 +1843,7 @@ impl<
let addr = subtree.root_addr; let addr = subtree.root_addr;
if addr.index() + 1 >= 0x1 << (SHARD_HEIGHT - 1) { if addr.index() + 1 >= 0x1 << (SHARD_HEIGHT - 1) {
return Err(InsertionError::TreeFull); return Err(InsertionError::TreeFull.into());
} else { } else {
LocatedTree::empty(addr.next_at_level()).append(value, retention)? LocatedTree::empty(addr.next_at_level()).append(value, retention)?
} }
@ -1862,17 +1855,13 @@ impl<
LocatedTree::empty(root_addr).append(value, retention)? LocatedTree::empty(root_addr).append(value, retention)?
}; };
self.store self.store.put_shard(append_result)?;
.put_shard(append_result)
.map_err(InsertionError::Storage)?;
if let Some(c) = checkpoint_id { if let Some(c) = checkpoint_id {
self.store self.store
.add_checkpoint(c, Checkpoint::at_position(position)) .add_checkpoint(c, Checkpoint::at_position(position))?;
.map_err(InsertionError::Storage)?;
} }
self.prune_excess_checkpoints() self.prune_excess_checkpoints()?;
.map_err(InsertionError::Storage)?;
Ok(()) Ok(())
} }
@ -1896,9 +1885,9 @@ impl<
&mut self, &mut self,
mut start: Position, mut start: Position,
values: I, values: I,
) -> Result<Option<(Position, Vec<IncompleteAt>)>, InsertionError<S::Error>> ) -> Result<Option<(Position, Vec<IncompleteAt>)>, S::Error>
where where
S::Error: Debug, S::Error: From<InsertionError>,
{ {
let mut values = values.peekable(); let mut values = values.peekable();
let mut subtree_root_addr = Address::above_position(Self::subtree_level(), start); let mut subtree_root_addr = Address::above_position(Self::subtree_level(), start);
@ -1915,13 +1904,10 @@ impl<
.expect( .expect(
"Iterator containing leaf values to insert was verified to be nonempty.", "Iterator containing leaf values to insert was verified to be nonempty.",
); );
self.store self.store.put_shard(res.subtree)?;
.put_shard(res.subtree)
.map_err(InsertionError::Storage)?;
for (id, position) in res.checkpoints.into_iter() { for (id, position) in res.checkpoints.into_iter() {
self.store self.store
.add_checkpoint(id, Checkpoint::at_position(position)) .add_checkpoint(id, Checkpoint::at_position(position))?;
.map_err(InsertionError::Storage)?;
} }
values = res.remainder; values = res.remainder;
@ -1934,8 +1920,7 @@ impl<
} }
} }
self.prune_excess_checkpoints() self.prune_excess_checkpoints()?;
.map_err(InsertionError::Storage)?;
Ok(max_insert_position.map(|p| (p, all_incomplete))) Ok(max_insert_position.map(|p| (p, all_incomplete)))
} }
@ -1944,7 +1929,10 @@ impl<
pub fn insert_tree( pub fn insert_tree(
&mut self, &mut self,
tree: LocatedPrunableTree<H>, tree: LocatedPrunableTree<H>,
) -> Result<Vec<IncompleteAt>, InsertionError<S::Error>> { ) -> Result<Vec<IncompleteAt>, S::Error>
where
S::Error: From<InsertionError>,
{
let mut all_incomplete = vec![]; let mut all_incomplete = vec![];
for subtree in tree.decompose_to_level(Self::subtree_level()).into_iter() { for subtree in tree.decompose_to_level(Self::subtree_level()).into_iter() {
let root_addr = subtree.root_addr; let root_addr = subtree.root_addr;
@ -1955,18 +1943,16 @@ impl<
.get_shard(root_addr) .get_shard(root_addr)
.unwrap_or(&empty) .unwrap_or(&empty)
.insert_subtree(subtree, contains_marked)?; .insert_subtree(subtree, contains_marked)?;
self.store self.store.put_shard(new_subtree)?;
.put_shard(new_subtree)
.map_err(InsertionError::Storage)?;
all_incomplete.append(&mut incomplete); all_incomplete.append(&mut incomplete);
} }
Ok(all_incomplete) Ok(all_incomplete)
} }
/// Adds a checkpoint at the rightmost leaf state of the tree. /// Adds a checkpoint at the rightmost leaf state of the tree.
pub fn checkpoint(&mut self, checkpoint_id: C) -> Result<bool, InsertionError<S::Error>> pub fn checkpoint(&mut self, checkpoint_id: C) -> Result<bool, S::Error>
where where
S::Error: Debug, S::Error: From<InsertionError>,
{ {
fn go<H: Hashable + Clone + PartialEq>( fn go<H: Hashable + Clone + PartialEq>(
root_addr: Address, root_addr: Address,
@ -2037,32 +2023,25 @@ impl<
return Ok(false); return Ok(false);
} }
self.store self.store
.add_checkpoint(checkpoint_id, Checkpoint::at_position(checkpoint_position)) .add_checkpoint(checkpoint_id, Checkpoint::at_position(checkpoint_position))?;
.map_err(InsertionError::Storage)?;
// early return once we've updated the tree state // early return once we've updated the tree state
self.prune_excess_checkpoints() self.prune_excess_checkpoints()?;
.map_err(InsertionError::Storage)?;
return Ok(true); return Ok(true);
} }
} }
self.store self.store
.add_checkpoint(checkpoint_id, Checkpoint::tree_empty()) .add_checkpoint(checkpoint_id, Checkpoint::tree_empty())?;
.map_err(InsertionError::Storage)?;
// TODO: it should not be necessary to do this on every checkpoint, // TODO: it should not be necessary to do this on every checkpoint,
// but currently that's how the reference tree behaves so we're maintaining // but currently that's how the reference tree behaves so we're maintaining
// those semantics for test compatibility. // those semantics for test compatibility.
self.prune_excess_checkpoints() self.prune_excess_checkpoints()?;
.map_err(InsertionError::Storage)?;
Ok(true) Ok(true)
} }
fn prune_excess_checkpoints(&mut self) -> Result<(), S::Error> fn prune_excess_checkpoints(&mut self) -> Result<(), S::Error> {
where
S::Error: Debug,
{
let checkpoint_count = self.store.checkpoint_count()?; let checkpoint_count = self.store.checkpoint_count()?;
if checkpoint_count > self.max_checkpoints { if checkpoint_count > self.max_checkpoints {
// Batch removals by subtree & create a list of the checkpoint identifiers that // Batch removals by subtree & create a list of the checkpoint identifiers that
@ -2070,39 +2049,37 @@ impl<
let mut checkpoints_to_delete = vec![]; let mut checkpoints_to_delete = vec![];
let mut clear_positions: BTreeMap<Address, BTreeMap<Position, RetentionFlags>> = let mut clear_positions: BTreeMap<Address, BTreeMap<Position, RetentionFlags>> =
BTreeMap::new(); BTreeMap::new();
self.store self.store.with_checkpoints(
.with_checkpoints( checkpoint_count - self.max_checkpoints,
checkpoint_count - self.max_checkpoints, |cid, checkpoint| {
|cid, checkpoint| { checkpoints_to_delete.push(cid.clone());
checkpoints_to_delete.push(cid.clone());
let mut clear_at = |pos, flags_to_clear| { let mut clear_at = |pos, flags_to_clear| {
let subtree_addr = Address::above_position(Self::subtree_level(), pos); let subtree_addr = Address::above_position(Self::subtree_level(), pos);
clear_positions clear_positions
.entry(subtree_addr) .entry(subtree_addr)
.and_modify(|to_clear| { .and_modify(|to_clear| {
to_clear to_clear
.entry(pos) .entry(pos)
.and_modify(|flags| *flags |= flags_to_clear) .and_modify(|flags| *flags |= flags_to_clear)
.or_insert(flags_to_clear); .or_insert(flags_to_clear);
}) })
.or_insert_with(|| BTreeMap::from([(pos, flags_to_clear)])); .or_insert_with(|| BTreeMap::from([(pos, flags_to_clear)]));
}; };
// clear the checkpoint leaf // clear the checkpoint leaf
if let TreeState::AtPosition(pos) = checkpoint.tree_state { if let TreeState::AtPosition(pos) = checkpoint.tree_state {
clear_at(pos, RetentionFlags::CHECKPOINT) clear_at(pos, RetentionFlags::CHECKPOINT)
} }
// clear the leaves that have been marked for removal // clear the leaves that have been marked for removal
for unmark_pos in checkpoint.marks_removed.iter() { for unmark_pos in checkpoint.marks_removed.iter() {
clear_at(*unmark_pos, RetentionFlags::MARKED) clear_at(*unmark_pos, RetentionFlags::MARKED)
} }
Ok(()) Ok(())
}, },
) )?;
.expect("The provided function is infallible.");
// Prune each affected subtree // Prune each affected subtree
for (subtree_addr, positions) in clear_positions.into_iter() { for (subtree_addr, positions) in clear_positions.into_iter() {
@ -2465,11 +2442,10 @@ pub mod testing {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{ use crate::{
IncompleteAt, LocatedPrunableTree, LocatedTree, MemoryShardStore, Node, PrunableTree, IncompleteAt, InsertionError, LocatedPrunableTree, LocatedTree, MemoryShardStore, Node,
QueryError, RetentionFlags, ShardStore, ShardTree, Tree, PrunableTree, QueryError, RetentionFlags, ShardStore, ShardTree, Tree,
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
use core::convert::Infallible;
use incrementalmerkletree::{ use incrementalmerkletree::{
testing::{ testing::{
self, arb_operation, check_append, check_checkpoint_rewind, check_operations, self, arb_operation, check_append, check_checkpoint_rewind, check_operations,
@ -2569,7 +2545,7 @@ mod tests {
}; };
assert_eq!( assert_eq!(
t.insert_subtree::<Infallible>( t.insert_subtree(
LocatedTree { LocatedTree {
root_addr: Address::from_parts(1.into(), 6), root_addr: Address::from_parts(1.into(), 6),
root: parent(leaf(("e".to_string(), RetentionFlags::MARKED)), nil()) root: parent(leaf(("e".to_string(), RetentionFlags::MARKED)), nil())
@ -2746,20 +2722,20 @@ mod tests {
fn located_prunable_tree_insert() { fn located_prunable_tree_insert() {
let tree = LocatedPrunableTree::empty(Address::from_parts(Level::from(2), 0)); let tree = LocatedPrunableTree::empty(Address::from_parts(Level::from(2), 0));
let (base, _, _) = tree let (base, _, _) = tree
.append::<(), Infallible>("a".to_string(), Retention::Ephemeral) .append::<()>("a".to_string(), Retention::Ephemeral)
.unwrap(); .unwrap();
assert_eq!(base.right_filled_root(), Ok("a___".to_string())); assert_eq!(base.right_filled_root(), Ok("a___".to_string()));
// Perform an in-order insertion. // Perform an in-order insertion.
let (in_order, pos, _) = base let (in_order, pos, _) = base
.append::<(), Infallible>("b".to_string(), Retention::Ephemeral) .append::<()>("b".to_string(), Retention::Ephemeral)
.unwrap(); .unwrap();
assert_eq!(pos, 1.into()); assert_eq!(pos, 1.into());
assert_eq!(in_order.right_filled_root(), Ok("ab__".to_string())); assert_eq!(in_order.right_filled_root(), Ok("ab__".to_string()));
// On the same tree, perform an out-of-order insertion. // On the same tree, perform an out-of-order insertion.
let out_of_order = base let out_of_order = base
.batch_insert::<(), _, Infallible>( .batch_insert::<(), _>(
Position::from(3), Position::from(3),
vec![("d".to_string(), Retention::Ephemeral)].into_iter(), vec![("d".to_string(), Retention::Ephemeral)].into_iter(),
) )
@ -2778,7 +2754,7 @@ mod tests {
let complete = out_of_order let complete = out_of_order
.subtree .subtree
.batch_insert::<(), _, Infallible>( .batch_insert::<(), _>(
Position::from(1), Position::from(1),
vec![ vec![
("b".to_string(), Retention::Ephemeral), ("b".to_string(), Retention::Ephemeral),
@ -2890,7 +2866,7 @@ mod tests {
const SHARD_HEIGHT: u8, const SHARD_HEIGHT: u8,
> testing::Tree<H, C> for ShardTree<H, C, S, DEPTH, SHARD_HEIGHT> > testing::Tree<H, C> for ShardTree<H, C, S, DEPTH, SHARD_HEIGHT>
where where
S::Error: core::fmt::Debug, S::Error: core::fmt::Debug + From<InsertionError>,
{ {
fn depth(&self) -> u8 { fn depth(&self) -> u8 {
DEPTH DEPTH