simplifies ShredIndex api (#21932)

This commit is contained in:
behzad nouri 2021-12-16 19:17:32 +00:00 committed by GitHub
parent e83ca4bb28
commit efd64a3862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 54 deletions

View File

@ -637,7 +637,7 @@ impl Blockstore {
if let Some(shred) = prev_inserted_shreds.get(&key) { if let Some(shred) = prev_inserted_shreds.get(&key) {
return Some(shred.clone()); return Some(shred.clone());
} }
if !index.data().is_present(i) { if !index.data().contains(i) {
return None; return None;
} }
match data_cf.get_bytes((slot, i)).unwrap() { match data_cf.get_bytes((slot, i)).unwrap() {
@ -662,7 +662,7 @@ impl Blockstore {
if let Some(shred) = prev_inserted_shreds.get(&key) { if let Some(shred) = prev_inserted_shreds.get(&key) {
return Some(shred.clone()); return Some(shred.clone());
} }
if !index.coding().is_present(i) { if !index.coding().contains(i) {
return None; return None;
} }
match code_cf.get_bytes((slot, i)).unwrap() { match code_cf.get_bytes((slot, i)).unwrap() {
@ -1058,7 +1058,7 @@ impl Blockstore {
// So, all coding shreds in a given FEC block will have the same set index // So, all coding shreds in a given FEC block will have the same set index
if !is_trusted { if !is_trusted {
if index_meta.coding().is_present(shred_index) { if index_meta.coding().contains(shred_index) {
metrics.num_coding_shreds_exists += 1; metrics.num_coding_shreds_exists += 1;
handle_duplicate(shred); handle_duplicate(shred);
return false; return false;
@ -1304,7 +1304,7 @@ impl Blockstore {
// Commit step: commit all changes to the mutable structures at once, or none at all. // Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through. // We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?; write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;
index_meta.coding_mut().set_present(shred_index, true); index_meta.coding_mut().insert(shred_index);
Ok(()) Ok(())
} }
@ -1312,7 +1312,7 @@ impl Blockstore {
fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool { fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool {
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
// Check that the shred doesn't already exist in blockstore // Check that the shred doesn't already exist in blockstore
shred_index < slot_meta.consumed || data_index.is_present(shred_index) shred_index < slot_meta.consumed || data_index.contains(shred_index)
} }
fn get_data_shred_from_just_inserted_or_db<'a>( fn get_data_shred_from_just_inserted_or_db<'a>(
@ -1498,7 +1498,7 @@ impl Blockstore {
let new_consumed = if slot_meta.consumed == index { let new_consumed = if slot_meta.consumed == index {
let mut current_index = index + 1; let mut current_index = index + 1;
while data_index.is_present(current_index) { while data_index.contains(current_index) {
current_index += 1; current_index += 1;
} }
current_index current_index
@ -1514,7 +1514,7 @@ impl Blockstore {
// But only need to store the bytes within data_header.size // But only need to store the bytes within data_header.size
&shred.payload[..shred.data_header.size as usize], &shred.payload[..shred.data_header.size as usize],
)?; )?;
data_index.set_present(index, true); data_index.insert(index);
let newly_completed_data_sets = update_slot_meta( let newly_completed_data_sets = update_slot_meta(
last_in_slot, last_in_slot,
last_in_data, last_in_data,
@ -3208,7 +3208,7 @@ fn update_completed_data_indexes(
.filter(|ix| { .filter(|ix| {
let (begin, end) = (ix[0] as u64, ix[1] as u64); let (begin, end) = (ix[0] as u64, ix[1] as u64);
let num_shreds = (end - begin) as usize; let num_shreds = (end - begin) as usize;
received_data_shreds.present_in_bounds(begin..end) == num_shreds received_data_shreds.range(begin..end).count() == num_shreds
}) })
.map(|ix| (ix[0], ix[1] - 1)) .map(|ix| (ix[0], ix[1] - 1))
.collect() .collect()
@ -8131,7 +8131,7 @@ pub mod tests {
// Test that iterator and individual shred lookup yield same set // Test that iterator and individual shred lookup yield same set
assert!(blockstore.get_data_shred(slot, index).unwrap().is_some()); assert!(blockstore.get_data_shred(slot, index).unwrap().is_some());
// Test that the data index has current shred accounted for // Test that the data index has current shred accounted for
assert!(shred_index.data().is_present(index)); assert!(shred_index.data().contains(index));
} }
// Test the data index doesn't have anything extra // Test the data index doesn't have anything extra
@ -8145,7 +8145,7 @@ pub mod tests {
// Test that the iterator and individual shred lookup yield same set // Test that the iterator and individual shred lookup yield same set
assert!(blockstore.get_coding_shred(slot, index).unwrap().is_some()); assert!(blockstore.get_coding_shred(slot, index).unwrap().is_some());
// Test that the coding index has current shred accounted for // Test that the coding index has current shred accounted for
assert!(shred_index.coding().is_present(index)); assert!(shred_index.coding().contains(index));
} }
// Test the data index doesn't have anything extra // Test the data index doesn't have anything extra
@ -8255,7 +8255,7 @@ pub mod tests {
let mut shred_index = ShredIndex::default(); let mut shred_index = ShredIndex::default();
for i in 0..10 { for i in 0..10 {
shred_index.set_present(i as u64, true); shred_index.insert(i as u64);
assert_eq!( assert_eq!(
update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes), update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
vec![(i, i)] vec![(i, i)]
@ -8269,21 +8269,21 @@ pub mod tests {
let mut completed_data_indexes = BTreeSet::default(); let mut completed_data_indexes = BTreeSet::default();
let mut shred_index = ShredIndex::default(); let mut shred_index = ShredIndex::default();
shred_index.set_present(4, true); shred_index.insert(4);
assert!( assert!(
update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes) update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes)
.is_empty() .is_empty()
); );
assert!(completed_data_indexes.is_empty()); assert!(completed_data_indexes.is_empty());
shred_index.set_present(2, true); shred_index.insert(2);
assert!( assert!(
update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes) update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes)
.is_empty() .is_empty()
); );
assert!(completed_data_indexes.is_empty()); assert!(completed_data_indexes.is_empty());
shred_index.set_present(3, true); shred_index.insert(3);
assert!( assert!(
update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes) update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
.is_empty() .is_empty()
@ -8292,7 +8292,7 @@ pub mod tests {
// Inserting data complete shred 1 now confirms the range of shreds [2, 3] // Inserting data complete shred 1 now confirms the range of shreds [2, 3]
// is part of the same data set // is part of the same data set
shred_index.set_present(1, true); shred_index.insert(1);
assert_eq!( assert_eq!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes), update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
vec![(2, 3)] vec![(2, 3)]
@ -8301,7 +8301,7 @@ pub mod tests {
// Inserting data complete shred 0 now confirms the range of shreds [0] // Inserting data complete shred 0 now confirms the range of shreds [0]
// is part of the same data set // is part of the same data set
shred_index.set_present(0, true); shred_index.insert(0);
assert_eq!( assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes), update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)] vec![(0, 0), (1, 1)]
@ -8601,7 +8601,7 @@ pub mod tests {
assert_eq!(meta.consumed, shreds.len() as u64); assert_eq!(meta.consumed, shreds.len() as u64);
let shreds_index = blockstore.get_index(slot).unwrap().unwrap(); let shreds_index = blockstore.get_index(slot).unwrap().unwrap();
for i in 0..shreds.len() as u64 { for i in 0..shreds.len() as u64 {
assert!(shreds_index.data().is_present(i)); assert!(shreds_index.data().contains(i));
} }
// Cleanup the slot // Cleanup the slot

View File

@ -151,10 +151,10 @@ impl Index {
&self.coding &self.coding
} }
pub fn data_mut(&mut self) -> &mut ShredIndex { pub(crate) fn data_mut(&mut self) -> &mut ShredIndex {
&mut self.data &mut self.data
} }
pub fn coding_mut(&mut self) -> &mut ShredIndex { pub(crate) fn coding_mut(&mut self) -> &mut ShredIndex {
&mut self.coding &mut self.coding
} }
} }
@ -164,30 +164,19 @@ impl ShredIndex {
self.index.len() self.index.len()
} }
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize { pub(crate) fn range<R>(&self, bounds: R) -> impl Iterator<Item = &u64>
self.index.range(bounds).count() where
R: RangeBounds<u64>,
{
self.index.range(bounds)
} }
pub fn is_present(&self, index: u64) -> bool { pub(crate) fn contains(&self, index: u64) -> bool {
self.index.contains(&index) self.index.contains(&index)
} }
pub fn set_present(&mut self, index: u64, presence: bool) { pub(crate) fn insert(&mut self, index: u64) {
if presence {
self.index.insert(index); self.index.insert(index);
} else {
self.index.remove(&index);
}
}
pub fn set_many_present(&mut self, presence: impl IntoIterator<Item = (u64, bool)>) {
for (idx, present) in presence.into_iter() {
self.set_present(idx, present);
}
}
pub fn largest(&self) -> Option<u64> {
self.index.iter().rev().next().copied()
} }
} }
@ -301,10 +290,8 @@ impl ErasureMeta {
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus { pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*; use ErasureMetaStatus::*;
let num_coding = index let num_coding = index.coding().range(self.coding_shreds_indices()).count();
.coding() let num_data = index.data().range(self.data_shreds_indices()).count();
.present_in_bounds(self.coding_shreds_indices());
let num_data = index.data().present_in_bounds(self.data_shreds_indices());
let (data_missing, num_needed) = ( let (data_missing, num_needed) = (
self.config.num_data().saturating_sub(num_data), self.config.num_data().saturating_sub(num_data),
@ -355,7 +342,6 @@ mod test {
use { use {
super::*, super::*,
rand::{seq::SliceRandom, thread_rng}, rand::{seq::SliceRandom, thread_rng},
std::iter::repeat,
}; };
#[test] #[test]
@ -379,35 +365,35 @@ mod test {
assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data())); assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data()));
index for ix in data_indexes.clone() {
.data_mut() index.data_mut().insert(ix);
.set_many_present(data_indexes.clone().zip(repeat(true))); }
assert_eq!(e_meta.status(&index), DataFull); assert_eq!(e_meta.status(&index), DataFull);
index for ix in coding_indexes.clone() {
.coding_mut() index.coding_mut().insert(ix);
.set_many_present(coding_indexes.clone().zip(repeat(true))); }
for &idx in data_indexes for &idx in data_indexes
.clone() .clone()
.collect::<Vec<_>>() .collect::<Vec<_>>()
.choose_multiple(&mut rng, erasure_config.num_data()) .choose_multiple(&mut rng, erasure_config.num_data())
{ {
index.data_mut().set_present(idx, false); index.data_mut().index.remove(&idx);
assert_eq!(e_meta.status(&index), CanRecover); assert_eq!(e_meta.status(&index), CanRecover);
} }
index for ix in data_indexes {
.data_mut() index.data_mut().insert(ix);
.set_many_present(data_indexes.zip(repeat(true))); }
for &idx in coding_indexes for &idx in coding_indexes
.collect::<Vec<_>>() .collect::<Vec<_>>()
.choose_multiple(&mut rng, erasure_config.num_coding()) .choose_multiple(&mut rng, erasure_config.num_coding())
{ {
index.coding_mut().set_present(idx, false); index.coding_mut().index.remove(&idx);
assert_eq!(e_meta.status(&index), DataFull); assert_eq!(e_meta.status(&index), DataFull);
} }