for_each_checkpoint

This commit is contained in:
Eric Tu 2024-09-10 10:12:30 -04:00
parent 0f76ad8009
commit eb3843e0cf
3 changed files with 30 additions and 0 deletions

View File

@ -116,6 +116,12 @@ pub trait ShardStore {
where where
F: FnMut(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>; F: FnMut(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>;
/// Calls the given callback for each checkpoint in `CheckpointId` order. This is
/// essentially the immutable version of `with_checkpoints`.
fn for_each_checkpoint<F>(&self, limit: usize, callback: F) -> Result<(), Self::Error>
where
F: Fn(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>;
/// Update the checkpoint having the given identifier by mutating it with the provided /// Update the checkpoint having the given identifier by mutating it with the provided
/// function, and persist the updated checkpoint to the data store. /// function, and persist the updated checkpoint to the data store.
/// ///
@ -218,6 +224,13 @@ impl<S: ShardStore> ShardStore for &mut S {
S::with_checkpoints(self, limit, callback) S::with_checkpoints(self, limit, callback)
} }
fn for_each_checkpoint<F>(&self, limit: usize, callback: F) -> Result<(), Self::Error>
where
F: Fn(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>,
{
S::for_each_checkpoint(self, limit, callback)
}
fn update_checkpoint_with<F>( fn update_checkpoint_with<F>(
&mut self, &mut self,
checkpoint_id: &Self::CheckpointId, checkpoint_id: &Self::CheckpointId,

View File

@ -184,6 +184,12 @@ where
{ {
self.cache.with_checkpoints(limit, callback) self.cache.with_checkpoints(limit, callback)
} }
fn for_each_checkpoint<F>(&self, limit: usize, callback: F) -> Result<(), Self::Error>
where
F: Fn(&Self::CheckpointId, &Checkpoint) -> Result<(), Self::Error>,
{
self.cache.for_each_checkpoint(limit, callback)
}
fn update_checkpoint_with<F>( fn update_checkpoint_with<F>(
&mut self, &mut self,

View File

@ -137,6 +137,17 @@ impl<H: Clone, C: Clone + Ord> ShardStore for MemoryShardStore<H, C> {
Ok(()) Ok(())
} }
fn for_each_checkpoint<F>(&self, limit: usize, callback: F) -> Result<(), Self::Error>
where
F: Fn(&C, &Checkpoint) -> Result<(), Self::Error>,
{
for (cid, checkpoint) in self.checkpoints.iter().take(limit) {
callback(cid, checkpoint)?
}
Ok(())
}
fn update_checkpoint_with<F>( fn update_checkpoint_with<F>(
&mut self, &mut self,
checkpoint_id: &C, checkpoint_id: &C,