Dynamic erasure set configuration (#5018)

* Use local erasure session to create/broadcast coding blobs

* Individual session for each recovery (as the config might be different)

* address review comments

* new constructors for session and coding generator

* unit test for dynamic erasure config
This commit is contained in:
Pankaj Garg 2019-07-11 13:58:33 -07:00 committed by GitHub
parent a191f3fd90
commit 4c90898f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 314 additions and 84 deletions

View File

@ -2,7 +2,7 @@
//! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger.
use crate::entry::Entry;
use crate::erasure::{self, Session};
use crate::erasure::{ErasureConfig, Session};
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::result::{Error, Result};
@ -90,7 +90,6 @@ pub struct Blocktree {
orphans_cf: LedgerColumn<cf::Orphans>,
index_cf: LedgerColumn<cf::Index>,
batch_processor: Arc<RwLock<BatchProcessor>>,
session: Arc<erasure::Session>,
pub new_blobs_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
}
@ -144,9 +143,6 @@ impl Blocktree {
let orphans_cf = db.column();
let index_cf = db.column();
// setup erasure
let session = Arc::new(erasure::Session::default());
let db = Arc::new(db);
Ok(Blocktree {
@ -158,7 +154,6 @@ impl Blocktree {
erasure_meta_cf,
orphans_cf,
index_cf,
session,
new_blobs_signals: vec![],
batch_processor,
completed_slots_senders: vec![],
@ -328,11 +323,22 @@ impl Blocktree {
let mut slot_meta_working_set = HashMap::new();
let mut erasure_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let mut erasure_config_opt = None;
for blob in new_blobs.iter() {
let blob = blob.borrow();
assert!(!blob.is_coding());
match erasure_config_opt {
Some(config) => {
if config != blob.erasure_config() {
// ToDo: This is a potential slashing condition
error!("Multiple erasure config for the same slot.");
}
}
None => erasure_config_opt = Some(blob.erasure_config()),
}
let blob_slot = blob.slot();
let _ = index_working_set.entry(blob_slot).or_insert_with(|| {
@ -342,7 +348,8 @@ impl Blocktree {
.unwrap_or_else(|| Index::new(blob_slot))
});
let set_index = ErasureMeta::set_index_for(blob.index());
let set_index =
ErasureMeta::set_index_for(blob.index(), erasure_config_opt.unwrap().num_data());
if let Some(erasure_meta) = self.erasure_meta_cf.get((blob_slot, set_index))? {
erasure_meta_working_set.insert((blob_slot, set_index), erasure_meta);
}
@ -350,12 +357,12 @@ impl Blocktree {
let recovered_data_opt = handle_recovery(
&self.db,
&self.session,
&erasure_meta_working_set,
&mut index_working_set,
&prev_inserted_blob_datas,
&mut prev_inserted_coding,
&mut write_batch,
&erasure_config_opt.unwrap_or_default(),
)?;
if let Some(recovered_data) = recovered_data_opt {
@ -540,13 +547,25 @@ impl Blocktree {
let mut prev_inserted_coding = HashMap::new();
let mut prev_inserted_blob_datas = HashMap::new();
let mut erasure_config_opt = None;
for blob_item in blobs {
let blob = blob_item.borrow();
assert!(blob.is_coding());
match erasure_config_opt {
Some(config) => {
if config != blob.erasure_config() {
// ToDo: This is a potential slashing condition
error!("Multiple erasure config for the same slot.");
}
}
None => erasure_config_opt = Some(blob.erasure_config()),
}
let (blob_slot, blob_index, blob_size) =
(blob.slot(), blob.index(), blob.size() as usize);
let set_index = blob_index / crate::erasure::NUM_CODING as u64;
let set_index = blob_index / blob.erasure_config().num_coding() as u64;
writebatch.put_bytes::<cf::Coding>(
(blob_slot, blob_index),
@ -566,7 +585,9 @@ impl Blocktree {
self.erasure_meta_cf
.get((blob_slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index))
.unwrap_or_else(|| {
ErasureMeta::new(set_index, &erasure_config_opt.unwrap())
})
});
// size should be the same for all coding blobs, else there's a bug
@ -581,12 +602,12 @@ impl Blocktree {
let recovered_data_opt = handle_recovery(
&self.db,
&self.session,
&erasure_metas,
&mut index_working_set,
&prev_inserted_blob_datas,
&mut prev_inserted_coding,
&mut writebatch,
&erasure_config_opt.unwrap_or_default(),
)?;
if let Some(recovered_data) = recovered_data_opt {
@ -1475,12 +1496,12 @@ fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotM
fn handle_recovery(
db: &Database,
session: &Session,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, Index>,
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
prev_inserted_coding: &mut HashMap<(u64, u64), Blob>,
writebatch: &mut WriteBatch,
erasure_config: &ErasureConfig,
) -> Result<Option<Vec<Blob>>> {
use solana_sdk::signature::Signable;
@ -1491,12 +1512,12 @@ fn handle_recovery(
if let Some((mut data, coding)) = try_erasure_recover(
db,
session,
&erasure_meta,
index,
slot,
&prev_inserted_blob_datas,
&prev_inserted_coding,
erasure_config,
)? {
for blob in data.iter() {
debug!(
@ -1587,15 +1608,13 @@ fn handle_recovery(
/// Attempts recovery using erasure coding
fn try_erasure_recover(
db: &Database,
session: &Session,
erasure_meta: &ErasureMeta,
index: &Index,
slot: u64,
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
prev_inserted_coding: &HashMap<(u64, u64), Blob>,
erasure_config: &ErasureConfig,
) -> Result<Option<(Vec<Blob>, Vec<Blob>)>> {
use crate::erasure::ERASURE_SET_SIZE;
let set_index = erasure_meta.set_index;
let start_index = erasure_meta.start_index();
let (data_end_index, coding_end_idx) = erasure_meta.end_indexes();
@ -1613,14 +1632,16 @@ fn try_erasure_recover(
let blobs = match erasure_meta.status(index) {
ErasureMetaStatus::CanRecover => {
let session = Session::new_from_config(erasure_config).unwrap();
let erasure_result = recover(
db,
session,
&session,
slot,
erasure_meta,
index,
prev_inserted_blob_datas,
prev_inserted_coding,
erasure_config,
);
match erasure_result {
@ -1628,7 +1649,7 @@ fn try_erasure_recover(
let recovered = data.len() + coding.len();
assert_eq!(
ERASURE_SET_SIZE,
erasure_config.num_data() + erasure_config.num_coding(),
recovered
+ index.data().present_in_bounds(start_index..data_end_index)
+ index
@ -1694,9 +1715,8 @@ fn recover(
index: &Index,
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
prev_inserted_coding: &HashMap<(u64, u64), Blob>,
erasure_config: &ErasureConfig,
) -> Result<(Vec<Blob>, Vec<Blob>)> {
use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA};
let start_idx = erasure_meta.start_index();
let size = erasure_meta.size();
let data_cf = db.column::<cf::Data>();
@ -1709,8 +1729,9 @@ fn recover(
let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes();
let present = &mut [true; ERASURE_SET_SIZE];
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
let erasure_set_size = erasure_config.num_data() + erasure_config.num_coding();
let present = &mut vec![true; erasure_set_size];
let mut blobs = Vec::with_capacity(erasure_set_size);
for i in start_idx..data_end_idx {
if index.data().is_present(i) {
@ -1753,7 +1774,7 @@ fn recover(
blobs.push(blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size].to_vec());
} else {
trace!("[recover] absent coding blob at {}", i);
let set_relative_idx = (i - start_idx) as usize + NUM_DATA;
let set_relative_idx = (i - start_idx) as usize + erasure_config.num_data();
blobs.push(vec![0; size]);
present[set_relative_idx] = false;
}
@ -1919,7 +1940,7 @@ pub mod tests {
use crate::entry::{
create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice,
};
use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA};
use crate::erasure::{CodingGenerator, ErasureConfig};
use crate::packet;
use rand::seq::SliceRandom;
use rand::thread_rng;
@ -2908,7 +2929,8 @@ pub mod tests {
let branching_factor: u64 = 4;
// Number of slots that will be in the tree
let num_slots = (branching_factor.pow(num_tree_levels) - 1) / (branching_factor - 1);
let entries_per_slot = NUM_DATA as u64;
let erasure_config = ErasureConfig::default();
let entries_per_slot = erasure_config.num_data() as u64;
assert!(entries_per_slot > 1);
let (mut blobs, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
@ -2939,9 +2961,9 @@ pub mod tests {
.cloned()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect();
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
let mut coding_generator = CodingGenerator::new_from_config(&erasure_config);
let coding_blobs = coding_generator.next(&shared_blobs);
assert_eq!(coding_blobs.len(), NUM_CODING);
assert_eq!(coding_blobs.len(), erasure_config.num_coding());
let mut rng = thread_rng();
@ -3437,7 +3459,7 @@ pub mod tests {
use super::*;
use crate::blocktree::meta::ErasureMetaStatus;
use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec};
use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA};
use crate::erasure::CodingGenerator;
use rand::{thread_rng, Rng};
use solana_sdk::signature::Signable;
use std::sync::RwLock;
@ -3457,8 +3479,9 @@ pub mod tests {
let path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&path).unwrap();
let erasure_config = ErasureConfig::default();
// two erasure sets
let num_blobs = NUM_DATA as u64 * 2;
let num_blobs = erasure_config.num_data() as u64 * 2;
let slot = 0;
let (mut blobs, _) = make_slot_entries(slot, 0, num_blobs);
@ -3481,11 +3504,13 @@ pub mod tests {
assert!(erasure_meta_opt.is_none());
blocktree.write_blobs(&blobs[2..NUM_DATA]).unwrap();
blocktree
.write_blobs(&blobs[2..erasure_config.num_data()])
.unwrap();
// insert all coding blobs in first set
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]);
let mut coding_generator = CodingGenerator::new_from_config(&erasure_config);
let coding_blobs = coding_generator.next(&shared_blobs[..erasure_config.num_data()]);
blocktree
.put_shared_coding_blobs(coding_blobs.iter())
@ -3500,11 +3525,11 @@ pub mod tests {
assert_eq!(erasure_meta.status(&index), DataFull);
// insert blob in the 2nd set so that recovery should be possible given all coding blobs
let set2 = &blobs[NUM_DATA..];
let set2 = &blobs[erasure_config.num_data()..];
blocktree.write_blobs(&set2[..1]).unwrap();
// insert all coding blobs in 2nd set. Should trigger recovery
let coding_blobs = coding_generator.next(&shared_blobs[NUM_DATA..]);
let coding_blobs = coding_generator.next(&shared_blobs[erasure_config.num_data()..]);
blocktree
.put_shared_coding_blobs(coding_blobs.iter())
@ -3542,14 +3567,16 @@ pub mod tests {
let slot = 0;
let ledger_path = get_tmp_ledger_path!();
let erasure_config = ErasureConfig::default();
let blocktree = Blocktree::open(&ledger_path).unwrap();
let num_sets = 3;
let data_blobs = make_slot_entries(slot, 0, num_sets * NUM_DATA as u64)
.0
.into_iter()
.map(Blob::into)
.collect::<Vec<_>>();
let data_blobs =
make_slot_entries(slot, 0, num_sets * erasure_config.num_data() as u64)
.0
.into_iter()
.map(Blob::into)
.collect::<Vec<_>>();
let keypair = Keypair::new();
data_blobs.iter().for_each(|blob: &Arc<RwLock<Blob>>| {
let mut b = blob.write().unwrap();
@ -3557,12 +3584,15 @@ pub mod tests {
b.sign(&keypair);
});
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
let mut coding_generator = CodingGenerator::new_from_config(&erasure_config);
for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() {
for (set_index, data_blobs) in data_blobs
.chunks_exact(erasure_config.num_data())
.enumerate()
{
let coding_blobs = coding_generator.next(&data_blobs);
assert_eq!(coding_blobs.len(), NUM_CODING);
assert_eq!(coding_blobs.len(), erasure_config.num_coding());
let deleted_data = data_blobs[0].clone();
@ -3577,15 +3607,21 @@ pub mod tests {
// Verify the slot meta
let slot_meta = blocktree.meta(slot).unwrap().unwrap();
assert_eq!(slot_meta.consumed, (NUM_DATA * (set_index + 1)) as u64);
assert_eq!(slot_meta.received, (NUM_DATA * (set_index + 1)) as u64);
assert_eq!(
slot_meta.consumed,
(erasure_config.num_data() * (set_index + 1)) as u64
);
assert_eq!(
slot_meta.received,
(erasure_config.num_data() * (set_index + 1)) as u64
);
assert_eq!(slot_meta.parent_slot, 0);
assert!(slot_meta.next_slots.is_empty());
assert_eq!(slot_meta.is_connected, true);
if set_index as u64 == num_sets - 1 {
assert_eq!(
slot_meta.last_index,
(NUM_DATA * (set_index + 1) - 1) as u64
(erasure_config.num_data() * (set_index + 1) - 1) as u64
);
}
@ -3628,22 +3664,25 @@ pub mod tests {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
let erasure_config = ErasureConfig::default();
let blocktree = Blocktree::open(&ledger_path).unwrap();
let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64)
let data_blobs = make_slot_entries(SLOT, 0, erasure_config.num_data() as u64)
.0
.into_iter()
.map(Blob::into)
.collect::<Vec<_>>();
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
let session = Arc::new(Session::new_from_config(&erasure_config).unwrap());
let mut coding_generator = CodingGenerator::new(Arc::clone(&session));
let shared_coding_blobs = coding_generator.next(&data_blobs);
assert_eq!(shared_coding_blobs.len(), NUM_CODING);
assert_eq!(shared_coding_blobs.len(), erasure_config.num_coding());
let mut prev_coding = HashMap::new();
let prev_data = HashMap::new();
let mut index = Index::new(SLOT);
let mut erasure_meta = ErasureMeta::new(SET_INDEX);
let mut erasure_meta = ErasureMeta::new(SET_INDEX, &erasure_config);
erasure_meta.size = shared_coding_blobs[0].read().unwrap().size();
for shared_blob in shared_coding_blobs.iter() {
@ -3652,18 +3691,19 @@ pub mod tests {
prev_coding.insert((blob.slot(), blob.index()), blob.clone());
}
index
.coding_mut()
.set_many_present((0..NUM_CODING as u64).zip(std::iter::repeat(true)));
index.coding_mut().set_many_present(
(0..erasure_config.num_coding() as u64).zip(std::iter::repeat(true)),
);
let (recovered_data, recovered_coding) = recover(
&blocktree.db,
&blocktree.session,
&session,
SLOT,
&erasure_meta,
&index,
&prev_data,
&prev_coding,
&erasure_config,
)
.expect("Successful recovery");
@ -3688,6 +3728,71 @@ pub mod tests {
}
}
pub fn try_recovery_using_erasure_config(
erasure_config: &ErasureConfig,
num_drop_data: usize,
slot: u64,
blocktree: &Blocktree,
) -> ErasureMetaStatus {
let entries = make_tiny_test_entries(erasure_config.num_data());
let mut blobs = entries_to_blobs_using_config(&entries, slot, 0, true, &erasure_config);
let keypair = Keypair::new();
blobs.iter_mut().for_each(|blob| {
blob.set_id(&keypair.pubkey());
blob.sign(&keypair);
});
let shared_blobs: Vec<_> = blobs
.iter()
.cloned()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect();
blocktree
.write_blobs(&blobs[..(erasure_config.num_data() - num_drop_data)])
.unwrap();
let mut coding_generator = CodingGenerator::new_from_config(&erasure_config);
let coding_blobs = coding_generator.next(&shared_blobs[..erasure_config.num_data()]);
blocktree
.put_shared_coding_blobs(coding_blobs.iter())
.unwrap();
let erasure_meta = blocktree
.erasure_meta(slot, 0)
.expect("DB get must succeed")
.unwrap();
let index = blocktree.get_index(slot).unwrap().unwrap();
erasure_meta.status(&index)
}
#[test]
fn test_recovery_different_configs() {
use ErasureMetaStatus::DataFull;
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&ledger_path).unwrap();
assert_eq!(
try_recovery_using_erasure_config(&ErasureConfig::default(), 4, 0, &blocktree),
DataFull
);
assert_eq!(
try_recovery_using_erasure_config(&ErasureConfig::new(12, 8), 8, 1, &blocktree),
DataFull
);
assert_eq!(
try_recovery_using_erasure_config(&ErasureConfig::new(16, 4), 4, 2, &blocktree),
DataFull
);
}
#[test]
fn test_recovery_fails_safely() {
const SLOT: u64 = 0;
@ -3695,17 +3800,18 @@ pub mod tests {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
let erasure_config = ErasureConfig::default();
let blocktree = Blocktree::open(&ledger_path).unwrap();
let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64)
let data_blobs = make_slot_entries(SLOT, 0, erasure_config.num_data() as u64)
.0
.into_iter()
.map(Blob::into)
.collect::<Vec<_>>();
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
let mut coding_generator = CodingGenerator::new_from_config(&erasure_config);
let shared_coding_blobs = coding_generator.next(&data_blobs);
assert_eq!(shared_coding_blobs.len(), NUM_CODING);
assert_eq!(shared_coding_blobs.len(), erasure_config.num_coding());
// Insert coding blobs except 1 and no data. Not enough to do recovery
blocktree
@ -3728,12 +3834,12 @@ pub mod tests {
let attempt_result = try_erasure_recover(
&blocktree.db,
&blocktree.session,
&erasure_meta,
&index,
SLOT,
&prev_inserted_blob_datas,
&prev_inserted_coding,
&erasure_config,
);
assert!(attempt_result.is_ok());
@ -3770,7 +3876,9 @@ pub mod tests {
let max_erasure_sets = 16;
solana_logger::setup();
let erasure_config = ErasureConfig::default();
let path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&path).unwrap());
let mut rng = thread_rng();
// Specification should generate a ledger where each slot has an random number of
@ -3786,11 +3894,14 @@ pub mod tests {
.map(|set_index| {
let (num_data, num_coding) = if set_index % 2 == 0 {
(
NUM_DATA - rng.gen_range(1, 3),
NUM_CODING - rng.gen_range(1, 3),
erasure_config.num_data() - rng.gen_range(1, 3),
erasure_config.num_coding() - rng.gen_range(1, 3),
)
} else {
(NUM_DATA - rng.gen_range(1, 5), NUM_CODING)
(
erasure_config.num_data() - rng.gen_range(1, 5),
erasure_config.num_coding(),
)
};
ErasureSpec {
set_index,
@ -3805,7 +3916,6 @@ pub mod tests {
.collect::<Vec<_>>();
let model = generate_ledger_model(specs);
let blocktree = Arc::new(Blocktree::open(&path).unwrap());
// Write to each slot in a different thread simultaneously.
// These writes should trigger the recovery. Every erasure set should have all of its
@ -3954,7 +4064,7 @@ pub mod tests {
// Should have all data
assert_eq!(
index.data().present_in_bounds(start_index..data_end_idx),
NUM_DATA
erasure_config.num_data()
);
}
}
@ -3964,17 +4074,19 @@ pub mod tests {
}
}
pub fn entries_to_blobs(
pub fn entries_to_blobs_using_config(
entries: &Vec<Entry>,
slot: u64,
parent_slot: u64,
is_full_slot: bool,
config: &ErasureConfig,
) -> Vec<Blob> {
let mut blobs = entries.clone().to_single_entry_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64);
b.set_slot(slot);
b.set_parent(parent_slot);
b.set_erasure_config(config);
}
if is_full_slot {
blobs.last_mut().unwrap().set_is_last_in_slot();
@ -3982,6 +4094,21 @@ pub mod tests {
blobs
}
pub fn entries_to_blobs(
entries: &Vec<Entry>,
slot: u64,
parent_slot: u64,
is_full_slot: bool,
) -> Vec<Blob> {
entries_to_blobs_using_config(
entries,
slot,
parent_slot,
is_full_slot,
&ErasureConfig::default(),
)
}
pub fn make_slot_entries(
slot: u64,
parent_slot: u64,

View File

@ -1,4 +1,4 @@
use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::erasure::ErasureConfig;
use solana_metrics::datapoint;
use std::{collections::BTreeSet, ops::RangeBounds};
@ -55,6 +55,8 @@ pub struct ErasureMeta {
pub set_index: u64,
/// Size of shards in this erasure set
pub size: usize,
/// Erasure configuration for this erasure set
config: ErasureConfig,
}
#[derive(Debug, PartialEq)]
@ -183,8 +185,12 @@ impl SlotMeta {
}
impl ErasureMeta {
pub fn new(set_index: u64) -> ErasureMeta {
ErasureMeta { set_index, size: 0 }
pub fn new(set_index: u64, config: &ErasureConfig) -> ErasureMeta {
ErasureMeta {
set_index,
size: 0,
config: *config,
}
}
pub fn status(&self, index: &Index) -> ErasureMetaStatus {
@ -196,16 +202,19 @@ impl ErasureMeta {
let num_coding = index.coding().present_in_bounds(start_idx..coding_end_idx);
let num_data = index.data().present_in_bounds(start_idx..data_end_idx);
let (data_missing, coding_missing) = (NUM_DATA - num_data, NUM_CODING - num_coding);
let (data_missing, coding_missing) = (
self.config.num_data() - num_data,
self.config.num_coding() - num_coding,
);
let total_missing = data_missing + coding_missing;
if data_missing > 0 && total_missing <= NUM_CODING {
if data_missing > 0 && total_missing <= self.config.num_coding() {
CanRecover
} else if data_missing == 0 {
DataFull
} else {
StillNeed(total_missing - NUM_CODING)
StillNeed(total_missing - self.config.num_coding())
}
}
@ -217,18 +226,21 @@ impl ErasureMeta {
self.size
}
pub fn set_index_for(index: u64) -> u64 {
index / NUM_DATA as u64
pub fn set_index_for(index: u64, num_data: usize) -> u64 {
index / num_data as u64
}
pub fn start_index(&self) -> u64 {
self.set_index * NUM_DATA as u64
self.set_index * self.config.num_data() as u64
}
/// returns a tuple of (data_end, coding_end)
pub fn end_indexes(&self) -> (u64, u64) {
let start = self.start_index();
(start + NUM_DATA as u64, start + NUM_CODING as u64)
(
start + self.config.num_data() as u64,
start + self.config.num_coding() as u64,
)
}
}
@ -243,16 +255,17 @@ mod test {
use ErasureMetaStatus::*;
let set_index = 0;
let erasure_config = ErasureConfig::default();
let mut e_meta = ErasureMeta::new(set_index);
let mut e_meta = ErasureMeta::new(set_index, &erasure_config);
let mut rng = thread_rng();
let mut index = Index::new(0);
e_meta.size = 1;
let data_indexes = 0..NUM_DATA as u64;
let coding_indexes = 0..NUM_CODING as u64;
let data_indexes = 0..erasure_config.num_data() as u64;
let coding_indexes = 0..erasure_config.num_coding() as u64;
assert_eq!(e_meta.status(&index), StillNeed(NUM_DATA));
assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data()));
index
.data_mut()
@ -267,7 +280,7 @@ mod test {
for &idx in data_indexes
.clone()
.collect::<Vec<_>>()
.choose_multiple(&mut rng, NUM_DATA)
.choose_multiple(&mut rng, erasure_config.num_data())
{
index.data_mut().set_present(idx, false);
@ -280,7 +293,7 @@ mod test {
for &idx in coding_indexes
.collect::<Vec<_>>()
.choose_multiple(&mut rng, NUM_CODING)
.choose_multiple(&mut rng, erasure_config.num_coding())
{
index.coding_mut().set_present(idx, false);

View File

@ -5,7 +5,7 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::erasure::CodingGenerator;
use crate::erasure::{CodingGenerator, ErasureConfig};
use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
@ -51,6 +51,7 @@ impl BroadcastStageType {
receiver: Receiver<WorkingBankEntries>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
erasure_config: &ErasureConfig,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => BroadcastStage::new(
@ -60,6 +61,7 @@ impl BroadcastStageType {
exit_sender,
blocktree,
StandardBroadcastRun::new(),
erasure_config,
),
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
@ -69,6 +71,7 @@ impl BroadcastStageType {
exit_sender,
blocktree,
FailEntryVerificationBroadcastRun::new(),
erasure_config,
),
BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new(
@ -78,6 +81,7 @@ impl BroadcastStageType {
exit_sender,
blocktree,
BroadcastFakeBlobsRun::new(0),
erasure_config,
),
BroadcastStageType::BroadcastBadBlobSizes => BroadcastStage::new(
@ -87,6 +91,7 @@ impl BroadcastStageType {
exit_sender,
blocktree,
BroadcastBadBlobSizes::new(),
erasure_config,
),
}
}
@ -138,8 +143,9 @@ impl BroadcastStage {
receiver: &Receiver<WorkingBankEntries>,
blocktree: &Arc<Blocktree>,
mut broadcast_stage_run: impl BroadcastRun,
erasure_config: &ErasureConfig,
) -> BroadcastStageReturnType {
let coding_generator = CodingGenerator::default();
let coding_generator = CodingGenerator::new_from_config(erasure_config);
let mut broadcast = Broadcast {
coding_generator,
@ -191,9 +197,11 @@ impl BroadcastStage {
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
broadcast_stage_run: impl BroadcastRun + Send + 'static,
erasure_config: &ErasureConfig,
) -> Self {
let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone();
let erasure_config = *erasure_config;
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
@ -204,6 +212,7 @@ impl BroadcastStage {
&receiver,
&blocktree,
broadcast_stage_run,
&erasure_config,
)
})
.unwrap();
@ -277,6 +286,7 @@ mod test {
&exit_sender,
&blocktree,
StandardBroadcastRun::new(),
&ErasureConfig::default(),
);
MockBroadcastStage {

View File

@ -135,7 +135,7 @@ mod tests {
hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes....
let golden: Hash = "5FzYtpCqL7v6ZxZ1fW4wRkn8TK96NdiD8cLV59Rr7yav"
let golden: Hash = "Dy2V98ybxnp1mDTqXrUbsLE5LyKVpQN5zDhrEKCDEFhH"
.parse()
.unwrap();

View File

@ -1652,6 +1652,7 @@ mod tests {
use crate::blocktree::tests::make_many_slot_entries;
use crate::blocktree::Blocktree;
use crate::crds_value::CrdsValueLabel;
use crate::erasure::ErasureConfig;
use crate::packet::BLOB_HEADER_SIZE;
use crate::repair_service::RepairType;
use crate::result::Error;
@ -1816,6 +1817,7 @@ mod tests {
w_blob.set_size(data_size);
w_blob.set_index(1);
w_blob.set_slot(2);
w_blob.set_erasure_config(&ErasureConfig::default());
w_blob.meta.size = data_size + BLOB_HEADER_SIZE;
}
@ -1860,6 +1862,7 @@ mod tests {
blob.set_size(data_size);
blob.set_index(i);
blob.set_slot(2);
blob.set_erasure_config(&ErasureConfig::default());
blob.meta.size = data_size + BLOB_HEADER_SIZE;
blob
})

View File

@ -55,6 +55,38 @@ pub const NUM_CODING: usize = 8;
/// Total number of blobs in an erasure set; includes data and coding blobs
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ErasureConfig {
num_data: usize,
num_coding: usize,
}
impl Default for ErasureConfig {
fn default() -> ErasureConfig {
ErasureConfig {
num_data: NUM_DATA,
num_coding: NUM_CODING,
}
}
}
impl ErasureConfig {
pub fn new(num_data: usize, num_coding: usize) -> ErasureConfig {
ErasureConfig {
num_data,
num_coding,
}
}
pub fn num_data(self) -> usize {
self.num_data
}
pub fn num_coding(self) -> usize {
self.num_coding
}
}
type Result<T> = std::result::Result<T, reed_solomon_erasure::Error>;
/// Represents an erasure "session" with a particular configuration and number of data and coding
@ -77,6 +109,12 @@ impl Session {
Ok(Session(rs))
}
pub fn new_from_config(config: &ErasureConfig) -> Result<Session> {
let rs = ReedSolomon::new(config.num_data, config.num_coding)?;
Ok(Session(rs))
}
/// Create coding blocks by overwriting `parity`
pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> {
self.0.encode_sep(data, parity)?;
@ -136,7 +174,7 @@ impl Session {
let idx;
let first_byte;
if n < NUM_DATA {
if n < self.0.data_shard_count() {
let mut blob = Blob::new(&blocks[n]);
blob.meta.size = blob.data_size() as usize;
@ -181,6 +219,13 @@ impl CodingGenerator {
}
}
pub fn new_from_config(config: &ErasureConfig) -> Self {
CodingGenerator {
leftover: Vec::with_capacity(config.num_data),
session: Arc::new(Session::new_from_config(config).unwrap()),
}
}
/// Yields next set of coding blobs, if any.
/// Must be called with consecutive data blobs within a slot.
///
@ -235,6 +280,7 @@ impl CodingGenerator {
coding_blob.set_id(&id);
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
coding_blob.set_erasure_config(&data_blob.erasure_config());
coding_blobs.push(coding_blob);
}
@ -744,6 +790,7 @@ pub mod test {
let mut blob = Blob::default();
blob.data_mut()[..].copy_from_slice(&data);
blob.set_size(BLOB_DATA_SIZE);
blob.set_erasure_config(&ErasureConfig::default());
Arc::new(RwLock::new(blob))
})
.collect();

View File

@ -1,5 +1,6 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::cuda_runtime::PinnedVec;
use crate::erasure::ErasureConfig;
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use crate::recycler::{Recycler, Reset};
use crate::result::{Error, Result};
@ -389,7 +390,8 @@ const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
const ERASURE_CONFIG_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, ErasureConfig);
const SIZE_RANGE: std::ops::Range<usize> = range!(ERASURE_CONFIG_RANGE.end, u64);
macro_rules! align {
($x:expr, $align:expr) => {
@ -426,6 +428,7 @@ impl Blob {
out.position() as usize
};
blob.set_size(pos);
blob.set_erasure_config(&ErasureConfig::default());
blob
}
@ -448,6 +451,14 @@ impl Blob {
LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix);
}
pub fn set_erasure_config(&mut self, config: &ErasureConfig) {
self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap())
}
pub fn erasure_config(&self) -> ErasureConfig {
bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default()
}
pub fn seed(&self) -> [u8; 32] {
let mut seed = [0; 32];
let seed_len = seed.len();
@ -807,6 +818,15 @@ mod tests {
assert!(!b.should_forward());
}
#[test]
fn test_blob_erasure_config() {
let mut b = Blob::default();
let config = ErasureConfig::new(32, 16);
b.set_erasure_config(&config);
assert_eq!(config, b.erasure_config());
}
#[test]
fn test_store_blobs_max() {
let serialized_size_size = bincode::serialized_size(&0usize).unwrap() as usize;

View File

@ -687,6 +687,7 @@ mod test {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::entry;
use crate::erasure::ErasureConfig;
use crate::genesis_utils::create_genesis_block;
use crate::packet::{Blob, BLOB_HEADER_SIZE};
use crate::replay_stage::ReplayStage;
@ -716,6 +717,7 @@ mod test {
let mut blob_slot_1 = Blob::default();
blob_slot_1.set_slot(1);
blob_slot_1.set_parent(0);
blob_slot_1.set_erasure_config(&ErasureConfig::default());
blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap();
assert!(bank_forks.get(1).is_none());
ReplayStage::generate_new_bank_forks(
@ -729,6 +731,7 @@ mod test {
let mut blob_slot_2 = Blob::default();
blob_slot_2.set_slot(2);
blob_slot_2.set_parent(0);
blob_slot_2.set_erasure_config(&ErasureConfig::default());
blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap();
assert!(bank_forks.get(2).is_none());
ReplayStage::generate_new_bank_forks(

View File

@ -6,6 +6,7 @@ use crate::blocktree::Blocktree;
use crate::broadcast_stage::{BroadcastStage, BroadcastStageType};
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::erasure::ErasureConfig;
use crate::fetch_stage::FetchStage;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service;
@ -37,6 +38,7 @@ impl Tpu {
sigverify_disabled: bool,
blocktree: &Arc<Blocktree>,
broadcast_type: &BroadcastStageType,
erasure_config: &ErasureConfig,
exit: &Arc<AtomicBool>,
) -> Self {
let (packet_sender, packet_receiver) = channel();
@ -74,6 +76,7 @@ impl Tpu {
entry_receiver,
&exit,
blocktree,
erasure_config,
);
Self {

View File

@ -6,6 +6,7 @@ use crate::blocktree_processor::{self, BankForksInfo};
use crate::broadcast_stage::BroadcastStageType;
use crate::cluster_info::{ClusterInfo, Node};
use crate::contact_info::ContactInfo;
use crate::erasure::ErasureConfig;
use crate::gossip_service::{discover_cluster, GossipService};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::PohRecorder;
@ -40,6 +41,7 @@ pub struct ValidatorConfig {
pub rpc_config: JsonRpcConfig,
pub snapshot_path: Option<String>,
pub broadcast_stage_type: BroadcastStageType,
pub erasure_config: ErasureConfig,
}
impl Default for ValidatorConfig {
@ -53,6 +55,7 @@ impl Default for ValidatorConfig {
rpc_config: JsonRpcConfig::default(),
snapshot_path: None,
broadcast_stage_type: BroadcastStageType::Standard,
erasure_config: ErasureConfig::default(),
}
}
}
@ -264,6 +267,7 @@ impl Validator {
config.sigverify_disabled,
&blocktree,
&config.broadcast_stage_type,
&config.erasure_config,
&exit,
);