adds back ErasureMeta::first_coding_index field (#21623)
https://github.com/solana-labs/solana/pull/16646 removed first_coding_index since the field is currently redundant and always equal to fec_set_index. However, with upcoming changes to erasure coding schema, this will no longer be the same as fec_set_index and so requires a separate field to represent.
This commit is contained in:
parent
ec7e17787e
commit
49ba09b333
|
@ -10,7 +10,6 @@ use {
|
|||
IteratorMode, LedgerColumn, Result, WriteBatch,
|
||||
},
|
||||
blockstore_meta::*,
|
||||
erasure::ErasureConfig,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
next_slots_iterator::NextSlotsIterator,
|
||||
shred::{Result as ShredResult, Shred, ShredType, Shredder, SHRED_PAYLOAD_SIZE},
|
||||
|
@ -1056,21 +1055,16 @@ impl Blockstore {
|
|||
}
|
||||
}
|
||||
|
||||
let set_index = u64::from(shred.common_header.fec_set_index);
|
||||
let erasure_config = ErasureConfig::new(
|
||||
shred.coding_header.num_data_shreds as usize,
|
||||
shred.coding_header.num_coding_shreds as usize,
|
||||
);
|
||||
|
||||
let set_index = u64::from(shred.fec_set_index());
|
||||
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
|
||||
self.erasure_meta(slot, set_index)
|
||||
.expect("Expect database get to succeed")
|
||||
.unwrap_or_else(|| ErasureMeta::new(set_index, erasure_config))
|
||||
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
|
||||
});
|
||||
|
||||
// TODO: handle_duplicate is not invoked and so duplicate shreds are
|
||||
// not gossiped to the rest of cluster.
|
||||
if erasure_config != erasure_meta.config() {
|
||||
if !erasure_meta.check_coding_shred(&shred) {
|
||||
metrics.num_coding_shreds_invalid_erasure_config += 1;
|
||||
let conflicting_shred = self.find_conflicting_coding_shred(
|
||||
&shred,
|
||||
|
@ -1093,7 +1087,7 @@ impl Blockstore {
|
|||
warn!("Received multiple erasure configs for the same erasure set!!!");
|
||||
warn!(
|
||||
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
|
||||
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), erasure_config
|
||||
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config(), shred.coding_header,
|
||||
);
|
||||
|
||||
return false;
|
||||
|
@ -1223,7 +1217,7 @@ impl Blockstore {
|
|||
}
|
||||
}
|
||||
|
||||
let set_index = u64::from(shred.common_header.fec_set_index);
|
||||
let set_index = u64::from(shred.fec_set_index());
|
||||
let newly_completed_data_sets = self.insert_data_shred(
|
||||
slot_meta,
|
||||
index_meta.data_mut(),
|
||||
|
@ -5438,7 +5432,7 @@ pub mod tests {
|
|||
true, // is_last_in_slot
|
||||
0, // reference_tick
|
||||
shred5.common_header.version,
|
||||
shred5.common_header.fec_set_index,
|
||||
shred5.fec_set_index(),
|
||||
);
|
||||
assert!(blockstore.should_insert_data_shred(
|
||||
&empty_shred,
|
||||
|
@ -5654,7 +5648,7 @@ pub mod tests {
|
|||
DataShredHeader::default(),
|
||||
coding.clone(),
|
||||
);
|
||||
let index = coding_shred.index() - coding_shred.common_header.fec_set_index - 1;
|
||||
let index = coding_shred.index() - coding_shred.fec_set_index() - 1;
|
||||
coding_shred.set_index(index as u32);
|
||||
|
||||
assert!(!Blockstore::should_insert_coding_shred(
|
||||
|
@ -5684,8 +5678,7 @@ pub mod tests {
|
|||
DataShredHeader::default(),
|
||||
coding.clone(),
|
||||
);
|
||||
let num_coding_shreds =
|
||||
coding_shred.common_header.index - coding_shred.common_header.fec_set_index;
|
||||
let num_coding_shreds = coding_shred.index() - coding_shred.fec_set_index();
|
||||
coding_shred.coding_header.num_coding_shreds = num_coding_shreds as u16;
|
||||
assert!(!Blockstore::should_insert_coding_shred(
|
||||
&coding_shred,
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
use {
|
||||
crate::erasure::ErasureConfig,
|
||||
crate::{
|
||||
erasure::ErasureConfig,
|
||||
shred::{Shred, ShredType},
|
||||
},
|
||||
serde::{Deserialize, Serialize},
|
||||
solana_sdk::{clock::Slot, hash::Hash},
|
||||
std::{
|
||||
|
@ -56,9 +59,8 @@ pub struct ShredIndex {
|
|||
pub struct ErasureMeta {
|
||||
/// Which erasure set in the slot this is
|
||||
set_index: u64,
|
||||
/// Deprecated field.
|
||||
#[serde(rename = "first_coding_index")]
|
||||
__unused_first_coding_index: u64,
|
||||
/// First coding index in the FEC set
|
||||
first_coding_index: u64,
|
||||
/// Size of shards in this erasure set
|
||||
#[serde(rename = "size")]
|
||||
__unused_size: usize,
|
||||
|
@ -226,15 +228,41 @@ impl SlotMeta {
|
|||
}
|
||||
|
||||
impl ErasureMeta {
|
||||
pub(crate) fn new(set_index: u64, config: ErasureConfig) -> ErasureMeta {
|
||||
ErasureMeta {
|
||||
set_index,
|
||||
config,
|
||||
__unused_first_coding_index: 0,
|
||||
__unused_size: 0,
|
||||
pub(crate) fn from_coding_shred(shred: &Shred) -> Option<Self> {
|
||||
match shred.shred_type() {
|
||||
ShredType::Data => None,
|
||||
ShredType::Code => {
|
||||
let config = ErasureConfig::new(
|
||||
usize::from(shred.coding_header.num_data_shreds),
|
||||
usize::from(shred.coding_header.num_coding_shreds),
|
||||
);
|
||||
let first_coding_index = u64::from(shred.first_coding_index()?);
|
||||
let erasure_meta = ErasureMeta {
|
||||
set_index: u64::from(shred.fec_set_index()),
|
||||
config,
|
||||
first_coding_index,
|
||||
__unused_size: 0,
|
||||
};
|
||||
Some(erasure_meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the erasure fields on the shred
|
||||
// are consistent with the erasure-meta.
|
||||
pub(crate) fn check_coding_shred(&self, shred: &Shred) -> bool {
|
||||
let mut other = match Self::from_coding_shred(shred) {
|
||||
Some(erasure_meta) => erasure_meta,
|
||||
None => return false,
|
||||
};
|
||||
other.__unused_size = self.__unused_size;
|
||||
// Ignore first_coding_index field for now to be backward compatible.
|
||||
// TODO remove this once cluster is upgraded to always populate
|
||||
// first_coding_index field.
|
||||
other.first_coding_index = self.first_coding_index;
|
||||
self == &other
|
||||
}
|
||||
|
||||
pub(crate) fn config(&self) -> ErasureConfig {
|
||||
self.config
|
||||
}
|
||||
|
@ -246,7 +274,16 @@ impl ErasureMeta {
|
|||
|
||||
pub(crate) fn coding_shreds_indices(&self) -> Range<u64> {
|
||||
let num_coding = self.config.num_coding() as u64;
|
||||
self.set_index..self.set_index + num_coding
|
||||
// first_coding_index == 0 may imply that the field is not populated.
|
||||
// self.set_index to be backward compatible.
|
||||
// TODO remove this once cluster is upgraded to always populate
|
||||
// first_coding_index field.
|
||||
let first_coding_index = if self.first_coding_index == 0 {
|
||||
self.set_index
|
||||
} else {
|
||||
self.first_coding_index
|
||||
};
|
||||
first_coding_index..first_coding_index + num_coding
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus {
|
||||
|
@ -316,7 +353,12 @@ mod test {
|
|||
let set_index = 0;
|
||||
let erasure_config = ErasureConfig::new(8, 16);
|
||||
|
||||
let e_meta = ErasureMeta::new(set_index, erasure_config);
|
||||
let e_meta = ErasureMeta {
|
||||
set_index,
|
||||
first_coding_index: set_index,
|
||||
config: erasure_config,
|
||||
__unused_size: 0,
|
||||
};
|
||||
let mut rng = thread_rng();
|
||||
let mut index = Index::new(0);
|
||||
|
||||
|
|
|
@ -53,18 +53,18 @@ pub struct ErasureConfig {
|
|||
}
|
||||
|
||||
impl ErasureConfig {
|
||||
pub fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
|
||||
pub(crate) fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
|
||||
ErasureConfig {
|
||||
num_data,
|
||||
num_coding,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_data(self) -> usize {
|
||||
pub(crate) fn num_data(self) -> usize {
|
||||
self.num_data
|
||||
}
|
||||
|
||||
pub fn num_coding(self) -> usize {
|
||||
pub(crate) fn num_coding(self) -> usize {
|
||||
self.num_coding
|
||||
}
|
||||
}
|
||||
|
|
|
@ -471,6 +471,15 @@ impl Shred {
|
|||
self.common_header.fec_set_index
|
||||
}
|
||||
|
||||
pub(crate) fn first_coding_index(&self) -> Option<u32> {
|
||||
match self.shred_type() {
|
||||
ShredType::Data => None,
|
||||
// TODO should be: self.index() - self.coding_header.position
|
||||
// once position field is populated.
|
||||
ShredType::Code => Some(self.fec_set_index()),
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the shred passes sanity checks.
|
||||
pub(crate) fn sanitize(&self) -> bool {
|
||||
self.erasure_block_index().is_some()
|
||||
|
@ -883,7 +892,7 @@ impl Shredder {
|
|||
assert_eq!(fec_set_index, index);
|
||||
assert!(data.iter().all(|shred| shred.common_header.slot == slot
|
||||
&& shred.common_header.version == version
|
||||
&& shred.common_header.fec_set_index == fec_set_index));
|
||||
&& shred.fec_set_index() == fec_set_index));
|
||||
let num_data = data.len();
|
||||
let num_coding = if is_last_in_slot {
|
||||
(2 * MAX_DATA_SHREDS_PER_FEC_BLOCK as usize)
|
||||
|
@ -929,7 +938,7 @@ impl Shredder {
|
|||
Self::verify_consistent_shred_payload_sizes("try_recovery()", &shreds)?;
|
||||
let (slot, fec_set_index) = match shreds.first() {
|
||||
None => return Ok(Vec::default()),
|
||||
Some(shred) => (shred.slot(), shred.common_header.fec_set_index),
|
||||
Some(shred) => (shred.slot(), shred.fec_set_index()),
|
||||
};
|
||||
let (num_data_shreds, num_coding_shreds) = match shreds.iter().find(|shred| shred.is_code())
|
||||
{
|
||||
|
@ -939,9 +948,9 @@ impl Shredder {
|
|||
shred.coding_header.num_coding_shreds,
|
||||
),
|
||||
};
|
||||
debug_assert!(shreds.iter().all(
|
||||
|shred| shred.slot() == slot && shred.common_header.fec_set_index == fec_set_index
|
||||
));
|
||||
debug_assert!(shreds
|
||||
.iter()
|
||||
.all(|shred| shred.slot() == slot && shred.fec_set_index() == fec_set_index));
|
||||
debug_assert!(shreds
|
||||
.iter()
|
||||
.filter(|shred| shred.is_code())
|
||||
|
@ -1784,7 +1793,7 @@ pub mod tests {
|
|||
let max_per_block = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
|
||||
data_shreds.iter().enumerate().for_each(|(i, s)| {
|
||||
let expected_fec_set_index = start_index + ((i / max_per_block) * max_per_block) as u32;
|
||||
assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
|
||||
assert_eq!(s.fec_set_index(), expected_fec_set_index);
|
||||
});
|
||||
|
||||
coding_shreds.iter().enumerate().for_each(|(i, s)| {
|
||||
|
@ -1792,7 +1801,7 @@ pub mod tests {
|
|||
while expected_fec_set_index as usize > data_shreds.len() {
|
||||
expected_fec_set_index -= max_per_block as u32;
|
||||
}
|
||||
assert_eq!(s.common_header.fec_set_index, expected_fec_set_index);
|
||||
assert_eq!(s.fec_set_index(), expected_fec_set_index);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue