diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 07f2b07bc7..5a0ce818fd 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -2,7 +2,7 @@ //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. use crate::entry::Entry; -use crate::erasure::{self, Session}; +use crate::erasure::{ErasureConfig, Session}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; @@ -90,7 +90,6 @@ pub struct Blocktree { orphans_cf: LedgerColumn, index_cf: LedgerColumn, batch_processor: Arc>, - session: Arc, pub new_blobs_signals: Vec>, pub completed_slots_senders: Vec>>, } @@ -144,9 +143,6 @@ impl Blocktree { let orphans_cf = db.column(); let index_cf = db.column(); - // setup erasure - let session = Arc::new(erasure::Session::default()); - let db = Arc::new(db); Ok(Blocktree { @@ -158,7 +154,6 @@ impl Blocktree { erasure_meta_cf, orphans_cf, index_cf, - session, new_blobs_signals: vec![], batch_processor, completed_slots_senders: vec![], @@ -328,11 +323,22 @@ impl Blocktree { let mut slot_meta_working_set = HashMap::new(); let mut erasure_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); + let mut erasure_config_opt = None; for blob in new_blobs.iter() { let blob = blob.borrow(); assert!(!blob.is_coding()); + match erasure_config_opt { + Some(config) => { + if config != blob.erasure_config() { + // ToDo: This is a potential slashing condition + error!("Multiple erasure config for the same slot."); + } + } + None => erasure_config_opt = Some(blob.erasure_config()), + } + let blob_slot = blob.slot(); let _ = index_working_set.entry(blob_slot).or_insert_with(|| { @@ -342,7 +348,8 @@ impl Blocktree { .unwrap_or_else(|| Index::new(blob_slot)) }); - let set_index = ErasureMeta::set_index_for(blob.index()); + let set_index = + ErasureMeta::set_index_for(blob.index(), erasure_config_opt.unwrap().num_data()); if let Some(erasure_meta) = self.erasure_meta_cf.get((blob_slot, set_index))? { erasure_meta_working_set.insert((blob_slot, set_index), erasure_meta); } @@ -350,12 +357,12 @@ impl Blocktree { let recovered_data_opt = handle_recovery( &self.db, - &self.session, &erasure_meta_working_set, &mut index_working_set, &prev_inserted_blob_datas, &mut prev_inserted_coding, &mut write_batch, + &erasure_config_opt.unwrap_or_default(), )?; if let Some(recovered_data) = recovered_data_opt { @@ -540,13 +547,25 @@ impl Blocktree { let mut prev_inserted_coding = HashMap::new(); let mut prev_inserted_blob_datas = HashMap::new(); + let mut erasure_config_opt = None; + for blob_item in blobs { let blob = blob_item.borrow(); assert!(blob.is_coding()); + match erasure_config_opt { + Some(config) => { + if config != blob.erasure_config() { + // ToDo: This is a potential slashing condition + error!("Multiple erasure config for the same slot."); + } + } + None => erasure_config_opt = Some(blob.erasure_config()), + } + let (blob_slot, blob_index, blob_size) = (blob.slot(), blob.index(), blob.size() as usize); - let set_index = blob_index / crate::erasure::NUM_CODING as u64; + let set_index = blob_index / blob.erasure_config().num_coding() as u64; writebatch.put_bytes::( (blob_slot, blob_index), @@ -566,7 +585,9 @@ impl Blocktree { self.erasure_meta_cf .get((blob_slot, set_index)) .expect("Expect database get to succeed") - .unwrap_or_else(|| ErasureMeta::new(set_index)) + .unwrap_or_else(|| { + ErasureMeta::new(set_index, &erasure_config_opt.unwrap()) + }) }); // size should be the same for all coding blobs, else there's a bug @@ -581,12 +602,12 @@ impl Blocktree { let recovered_data_opt = handle_recovery( &self.db, - &self.session, &erasure_metas, &mut index_working_set, &prev_inserted_blob_datas, &mut prev_inserted_coding, &mut writebatch, + &erasure_config_opt.unwrap_or_default(), )?; if let Some(recovered_data) = recovered_data_opt { @@ -1475,12 +1496,12 @@ fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option, index_working_set: &mut HashMap, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, prev_inserted_coding: &mut HashMap<(u64, u64), Blob>, writebatch: &mut WriteBatch, + erasure_config: &ErasureConfig, ) -> Result>> { use solana_sdk::signature::Signable; @@ -1491,12 +1512,12 @@ fn handle_recovery( if let Some((mut data, coding)) = try_erasure_recover( db, - session, &erasure_meta, index, slot, &prev_inserted_blob_datas, &prev_inserted_coding, + erasure_config, )? { for blob in data.iter() { debug!( @@ -1587,15 +1608,13 @@ fn handle_recovery( /// Attempts recovery using erasure coding fn try_erasure_recover( db: &Database, - session: &Session, erasure_meta: &ErasureMeta, index: &Index, slot: u64, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, prev_inserted_coding: &HashMap<(u64, u64), Blob>, + erasure_config: &ErasureConfig, ) -> Result, Vec)>> { - use crate::erasure::ERASURE_SET_SIZE; - let set_index = erasure_meta.set_index; let start_index = erasure_meta.start_index(); let (data_end_index, coding_end_idx) = erasure_meta.end_indexes(); @@ -1613,14 +1632,16 @@ fn try_erasure_recover( let blobs = match erasure_meta.status(index) { ErasureMetaStatus::CanRecover => { + let session = Session::new_from_config(erasure_config).unwrap(); let erasure_result = recover( db, - session, + &session, slot, erasure_meta, index, prev_inserted_blob_datas, prev_inserted_coding, + erasure_config, ); match erasure_result { @@ -1628,7 +1649,7 @@ fn try_erasure_recover( let recovered = data.len() + coding.len(); assert_eq!( - ERASURE_SET_SIZE, + erasure_config.num_data() + erasure_config.num_coding(), recovered + index.data().present_in_bounds(start_index..data_end_index) + index @@ -1694,9 +1715,8 @@ fn recover( index: &Index, prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>, prev_inserted_coding: &HashMap<(u64, u64), Blob>, + erasure_config: &ErasureConfig, ) -> Result<(Vec, Vec)> { - use crate::erasure::{ERASURE_SET_SIZE, NUM_DATA}; - let start_idx = erasure_meta.start_index(); let size = erasure_meta.size(); let data_cf = db.column::(); @@ -1709,8 +1729,9 @@ fn recover( let (data_end_idx, coding_end_idx) = erasure_meta.end_indexes(); - let present = &mut [true; ERASURE_SET_SIZE]; - let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); + let erasure_set_size = erasure_config.num_data() + erasure_config.num_coding(); + let present = &mut vec![true; erasure_set_size]; + let mut blobs = Vec::with_capacity(erasure_set_size); for i in start_idx..data_end_idx { if index.data().is_present(i) { @@ -1753,7 +1774,7 @@ fn recover( blobs.push(blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size].to_vec()); } else { trace!("[recover] absent coding blob at {}", i); - let set_relative_idx = (i - start_idx) as usize + NUM_DATA; + let set_relative_idx = (i - start_idx) as usize + erasure_config.num_data(); blobs.push(vec![0; size]); present[set_relative_idx] = false; } @@ -1919,7 +1940,7 @@ pub mod tests { use crate::entry::{ create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice, }; - use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; + use crate::erasure::{CodingGenerator, ErasureConfig}; use crate::packet; use rand::seq::SliceRandom; use rand::thread_rng; @@ -2908,7 +2929,8 @@ pub mod tests { let branching_factor: u64 = 4; // Number of slots that will be in the tree let num_slots = (branching_factor.pow(num_tree_levels) - 1) / (branching_factor - 1); - let entries_per_slot = NUM_DATA as u64; + let erasure_config = ErasureConfig::default(); + let entries_per_slot = erasure_config.num_data() as u64; assert!(entries_per_slot > 1); let (mut blobs, _) = make_many_slot_entries(0, num_slots, entries_per_slot); @@ -2939,9 +2961,9 @@ pub mod tests { .cloned() .map(|blob| Arc::new(RwLock::new(blob))) .collect(); - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let mut coding_generator = CodingGenerator::new_from_config(&erasure_config); let coding_blobs = coding_generator.next(&shared_blobs); - assert_eq!(coding_blobs.len(), NUM_CODING); + assert_eq!(coding_blobs.len(), erasure_config.num_coding()); let mut rng = thread_rng(); @@ -3437,7 +3459,7 @@ pub mod tests { use super::*; use crate::blocktree::meta::ErasureMetaStatus; use crate::erasure::test::{generate_ledger_model, ErasureSpec, SlotSpec}; - use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA}; + use crate::erasure::CodingGenerator; use rand::{thread_rng, Rng}; use solana_sdk::signature::Signable; use std::sync::RwLock; @@ -3457,8 +3479,9 @@ pub mod tests { let path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&path).unwrap(); + let erasure_config = ErasureConfig::default(); // two erasure sets - let num_blobs = NUM_DATA as u64 * 2; + let num_blobs = erasure_config.num_data() as u64 * 2; let slot = 0; let (mut blobs, _) = make_slot_entries(slot, 0, num_blobs); @@ -3481,11 +3504,13 @@ pub mod tests { assert!(erasure_meta_opt.is_none()); - blocktree.write_blobs(&blobs[2..NUM_DATA]).unwrap(); + blocktree + .write_blobs(&blobs[2..erasure_config.num_data()]) + .unwrap(); // insert all coding blobs in first set - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); - let coding_blobs = coding_generator.next(&shared_blobs[..NUM_DATA]); + let mut coding_generator = CodingGenerator::new_from_config(&erasure_config); + let coding_blobs = coding_generator.next(&shared_blobs[..erasure_config.num_data()]); blocktree .put_shared_coding_blobs(coding_blobs.iter()) @@ -3500,11 +3525,11 @@ pub mod tests { assert_eq!(erasure_meta.status(&index), DataFull); // insert blob in the 2nd set so that recovery should be possible given all coding blobs - let set2 = &blobs[NUM_DATA..]; + let set2 = &blobs[erasure_config.num_data()..]; blocktree.write_blobs(&set2[..1]).unwrap(); // insert all coding blobs in 2nd set. Should trigger recovery - let coding_blobs = coding_generator.next(&shared_blobs[NUM_DATA..]); + let coding_blobs = coding_generator.next(&shared_blobs[erasure_config.num_data()..]); blocktree .put_shared_coding_blobs(coding_blobs.iter()) @@ -3542,14 +3567,16 @@ pub mod tests { let slot = 0; let ledger_path = get_tmp_ledger_path!(); + let erasure_config = ErasureConfig::default(); let blocktree = Blocktree::open(&ledger_path).unwrap(); let num_sets = 3; - let data_blobs = make_slot_entries(slot, 0, num_sets * NUM_DATA as u64) - .0 - .into_iter() - .map(Blob::into) - .collect::>(); + let data_blobs = + make_slot_entries(slot, 0, num_sets * erasure_config.num_data() as u64) + .0 + .into_iter() + .map(Blob::into) + .collect::>(); let keypair = Keypair::new(); data_blobs.iter().for_each(|blob: &Arc>| { let mut b = blob.write().unwrap(); @@ -3557,12 +3584,15 @@ pub mod tests { b.sign(&keypair); }); - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let mut coding_generator = CodingGenerator::new_from_config(&erasure_config); - for (set_index, data_blobs) in data_blobs.chunks_exact(NUM_DATA).enumerate() { + for (set_index, data_blobs) in data_blobs + .chunks_exact(erasure_config.num_data()) + .enumerate() + { let coding_blobs = coding_generator.next(&data_blobs); - assert_eq!(coding_blobs.len(), NUM_CODING); + assert_eq!(coding_blobs.len(), erasure_config.num_coding()); let deleted_data = data_blobs[0].clone(); @@ -3577,15 +3607,21 @@ pub mod tests { // Verify the slot meta let slot_meta = blocktree.meta(slot).unwrap().unwrap(); - assert_eq!(slot_meta.consumed, (NUM_DATA * (set_index + 1)) as u64); - assert_eq!(slot_meta.received, (NUM_DATA * (set_index + 1)) as u64); + assert_eq!( + slot_meta.consumed, + (erasure_config.num_data() * (set_index + 1)) as u64 + ); + assert_eq!( + slot_meta.received, + (erasure_config.num_data() * (set_index + 1)) as u64 + ); assert_eq!(slot_meta.parent_slot, 0); assert!(slot_meta.next_slots.is_empty()); assert_eq!(slot_meta.is_connected, true); if set_index as u64 == num_sets - 1 { assert_eq!( slot_meta.last_index, - (NUM_DATA * (set_index + 1) - 1) as u64 + (erasure_config.num_data() * (set_index + 1) - 1) as u64 ); } @@ -3628,22 +3664,25 @@ pub mod tests { solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); + let erasure_config = ErasureConfig::default(); let blocktree = Blocktree::open(&ledger_path).unwrap(); - let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64) + let data_blobs = make_slot_entries(SLOT, 0, erasure_config.num_data() as u64) .0 .into_iter() .map(Blob::into) .collect::>(); - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let session = Arc::new(Session::new_from_config(&erasure_config).unwrap()); + + let mut coding_generator = CodingGenerator::new(Arc::clone(&session)); let shared_coding_blobs = coding_generator.next(&data_blobs); - assert_eq!(shared_coding_blobs.len(), NUM_CODING); + assert_eq!(shared_coding_blobs.len(), erasure_config.num_coding()); let mut prev_coding = HashMap::new(); let prev_data = HashMap::new(); let mut index = Index::new(SLOT); - let mut erasure_meta = ErasureMeta::new(SET_INDEX); + let mut erasure_meta = ErasureMeta::new(SET_INDEX, &erasure_config); erasure_meta.size = shared_coding_blobs[0].read().unwrap().size(); for shared_blob in shared_coding_blobs.iter() { @@ -3652,18 +3691,19 @@ pub mod tests { prev_coding.insert((blob.slot(), blob.index()), blob.clone()); } - index - .coding_mut() - .set_many_present((0..NUM_CODING as u64).zip(std::iter::repeat(true))); + index.coding_mut().set_many_present( + (0..erasure_config.num_coding() as u64).zip(std::iter::repeat(true)), + ); let (recovered_data, recovered_coding) = recover( &blocktree.db, - &blocktree.session, + &session, SLOT, &erasure_meta, &index, &prev_data, &prev_coding, + &erasure_config, ) .expect("Successful recovery"); @@ -3688,6 +3728,71 @@ pub mod tests { } } + pub fn try_recovery_using_erasure_config( + erasure_config: &ErasureConfig, + num_drop_data: usize, + slot: u64, + blocktree: &Blocktree, + ) -> ErasureMetaStatus { + let entries = make_tiny_test_entries(erasure_config.num_data()); + let mut blobs = entries_to_blobs_using_config(&entries, slot, 0, true, &erasure_config); + + let keypair = Keypair::new(); + blobs.iter_mut().for_each(|blob| { + blob.set_id(&keypair.pubkey()); + blob.sign(&keypair); + }); + + let shared_blobs: Vec<_> = blobs + .iter() + .cloned() + .map(|blob| Arc::new(RwLock::new(blob))) + .collect(); + + blocktree + .write_blobs(&blobs[..(erasure_config.num_data() - num_drop_data)]) + .unwrap(); + + let mut coding_generator = CodingGenerator::new_from_config(&erasure_config); + let coding_blobs = coding_generator.next(&shared_blobs[..erasure_config.num_data()]); + + blocktree + .put_shared_coding_blobs(coding_blobs.iter()) + .unwrap(); + + let erasure_meta = blocktree + .erasure_meta(slot, 0) + .expect("DB get must succeed") + .unwrap(); + let index = blocktree.get_index(slot).unwrap().unwrap(); + + erasure_meta.status(&index) + } + + #[test] + fn test_recovery_different_configs() { + use ErasureMetaStatus::DataFull; + solana_logger::setup(); + + let ledger_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&ledger_path).unwrap(); + + assert_eq!( + try_recovery_using_erasure_config(&ErasureConfig::default(), 4, 0, &blocktree), + DataFull + ); + + assert_eq!( + try_recovery_using_erasure_config(&ErasureConfig::new(12, 8), 8, 1, &blocktree), + DataFull + ); + + assert_eq!( + try_recovery_using_erasure_config(&ErasureConfig::new(16, 4), 4, 2, &blocktree), + DataFull + ); + } + #[test] fn test_recovery_fails_safely() { const SLOT: u64 = 0; @@ -3695,17 +3800,18 @@ pub mod tests { solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); + let erasure_config = ErasureConfig::default(); let blocktree = Blocktree::open(&ledger_path).unwrap(); - let data_blobs = make_slot_entries(SLOT, 0, NUM_DATA as u64) + let data_blobs = make_slot_entries(SLOT, 0, erasure_config.num_data() as u64) .0 .into_iter() .map(Blob::into) .collect::>(); - let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session)); + let mut coding_generator = CodingGenerator::new_from_config(&erasure_config); let shared_coding_blobs = coding_generator.next(&data_blobs); - assert_eq!(shared_coding_blobs.len(), NUM_CODING); + assert_eq!(shared_coding_blobs.len(), erasure_config.num_coding()); // Insert coding blobs except 1 and no data. Not enough to do recovery blocktree @@ -3728,12 +3834,12 @@ pub mod tests { let attempt_result = try_erasure_recover( &blocktree.db, - &blocktree.session, &erasure_meta, &index, SLOT, &prev_inserted_blob_datas, &prev_inserted_coding, + &erasure_config, ); assert!(attempt_result.is_ok()); @@ -3770,7 +3876,9 @@ pub mod tests { let max_erasure_sets = 16; solana_logger::setup(); + let erasure_config = ErasureConfig::default(); let path = get_tmp_ledger_path!(); + let blocktree = Arc::new(Blocktree::open(&path).unwrap()); let mut rng = thread_rng(); // Specification should generate a ledger where each slot has an random number of @@ -3786,11 +3894,14 @@ pub mod tests { .map(|set_index| { let (num_data, num_coding) = if set_index % 2 == 0 { ( - NUM_DATA - rng.gen_range(1, 3), - NUM_CODING - rng.gen_range(1, 3), + erasure_config.num_data() - rng.gen_range(1, 3), + erasure_config.num_coding() - rng.gen_range(1, 3), ) } else { - (NUM_DATA - rng.gen_range(1, 5), NUM_CODING) + ( + erasure_config.num_data() - rng.gen_range(1, 5), + erasure_config.num_coding(), + ) }; ErasureSpec { set_index, @@ -3805,7 +3916,6 @@ pub mod tests { .collect::>(); let model = generate_ledger_model(specs); - let blocktree = Arc::new(Blocktree::open(&path).unwrap()); // Write to each slot in a different thread simultaneously. // These writes should trigger the recovery. Every erasure set should have all of its @@ -3954,7 +4064,7 @@ pub mod tests { // Should have all data assert_eq!( index.data().present_in_bounds(start_index..data_end_idx), - NUM_DATA + erasure_config.num_data() ); } } @@ -3964,17 +4074,19 @@ pub mod tests { } } - pub fn entries_to_blobs( + pub fn entries_to_blobs_using_config( entries: &Vec, slot: u64, parent_slot: u64, is_full_slot: bool, + config: &ErasureConfig, ) -> Vec { let mut blobs = entries.clone().to_single_entry_blobs(); for (i, b) in blobs.iter_mut().enumerate() { b.set_index(i as u64); b.set_slot(slot); b.set_parent(parent_slot); + b.set_erasure_config(config); } if is_full_slot { blobs.last_mut().unwrap().set_is_last_in_slot(); @@ -3982,6 +4094,21 @@ pub mod tests { blobs } + pub fn entries_to_blobs( + entries: &Vec, + slot: u64, + parent_slot: u64, + is_full_slot: bool, + ) -> Vec { + entries_to_blobs_using_config( + entries, + slot, + parent_slot, + is_full_slot, + &ErasureConfig::default(), + ) + } + pub fn make_slot_entries( slot: u64, parent_slot: u64, diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 7f84afae66..6e12d215cb 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -1,4 +1,4 @@ -use crate::erasure::{NUM_CODING, NUM_DATA}; +use crate::erasure::ErasureConfig; use solana_metrics::datapoint; use std::{collections::BTreeSet, ops::RangeBounds}; @@ -55,6 +55,8 @@ pub struct ErasureMeta { pub set_index: u64, /// Size of shards in this erasure set pub size: usize, + /// Erasure configuration for this erasure set + config: ErasureConfig, } #[derive(Debug, PartialEq)] @@ -183,8 +185,12 @@ impl SlotMeta { } impl ErasureMeta { - pub fn new(set_index: u64) -> ErasureMeta { - ErasureMeta { set_index, size: 0 } + pub fn new(set_index: u64, config: &ErasureConfig) -> ErasureMeta { + ErasureMeta { + set_index, + size: 0, + config: *config, + } } pub fn status(&self, index: &Index) -> ErasureMetaStatus { @@ -196,16 +202,19 @@ impl ErasureMeta { let num_coding = index.coding().present_in_bounds(start_idx..coding_end_idx); let num_data = index.data().present_in_bounds(start_idx..data_end_idx); - let (data_missing, coding_missing) = (NUM_DATA - num_data, NUM_CODING - num_coding); + let (data_missing, coding_missing) = ( + self.config.num_data() - num_data, + self.config.num_coding() - num_coding, + ); let total_missing = data_missing + coding_missing; - if data_missing > 0 && total_missing <= NUM_CODING { + if data_missing > 0 && total_missing <= self.config.num_coding() { CanRecover } else if data_missing == 0 { DataFull } else { - StillNeed(total_missing - NUM_CODING) + StillNeed(total_missing - self.config.num_coding()) } } @@ -217,18 +226,21 @@ impl ErasureMeta { self.size } - pub fn set_index_for(index: u64) -> u64 { - index / NUM_DATA as u64 + pub fn set_index_for(index: u64, num_data: usize) -> u64 { + index / num_data as u64 } pub fn start_index(&self) -> u64 { - self.set_index * NUM_DATA as u64 + self.set_index * self.config.num_data() as u64 } /// returns a tuple of (data_end, coding_end) pub fn end_indexes(&self) -> (u64, u64) { let start = self.start_index(); - (start + NUM_DATA as u64, start + NUM_CODING as u64) + ( + start + self.config.num_data() as u64, + start + self.config.num_coding() as u64, + ) } } @@ -243,16 +255,17 @@ mod test { use ErasureMetaStatus::*; let set_index = 0; + let erasure_config = ErasureConfig::default(); - let mut e_meta = ErasureMeta::new(set_index); + let mut e_meta = ErasureMeta::new(set_index, &erasure_config); let mut rng = thread_rng(); let mut index = Index::new(0); e_meta.size = 1; - let data_indexes = 0..NUM_DATA as u64; - let coding_indexes = 0..NUM_CODING as u64; + let data_indexes = 0..erasure_config.num_data() as u64; + let coding_indexes = 0..erasure_config.num_coding() as u64; - assert_eq!(e_meta.status(&index), StillNeed(NUM_DATA)); + assert_eq!(e_meta.status(&index), StillNeed(erasure_config.num_data())); index .data_mut() @@ -267,7 +280,7 @@ mod test { for &idx in data_indexes .clone() .collect::>() - .choose_multiple(&mut rng, NUM_DATA) + .choose_multiple(&mut rng, erasure_config.num_data()) { index.data_mut().set_present(idx, false); @@ -280,7 +293,7 @@ mod test { for &idx in coding_indexes .collect::>() - .choose_multiple(&mut rng, NUM_CODING) + .choose_multiple(&mut rng, erasure_config.num_coding()) { index.coding_mut().set_present(idx, false); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index f558e52d67..17db905de9 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -5,7 +5,7 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR use self::standard_broadcast_run::StandardBroadcastRun; use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError}; -use crate::erasure::CodingGenerator; +use crate::erasure::{CodingGenerator, ErasureConfig}; use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; @@ -51,6 +51,7 @@ impl BroadcastStageType { receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, + erasure_config: &ErasureConfig, ) -> BroadcastStage { match self { BroadcastStageType::Standard => BroadcastStage::new( @@ -60,6 +61,7 @@ impl BroadcastStageType { exit_sender, blocktree, StandardBroadcastRun::new(), + erasure_config, ), BroadcastStageType::FailEntryVerification => BroadcastStage::new( @@ -69,6 +71,7 @@ impl BroadcastStageType { exit_sender, blocktree, FailEntryVerificationBroadcastRun::new(), + erasure_config, ), BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new( @@ -78,6 +81,7 @@ impl BroadcastStageType { exit_sender, blocktree, BroadcastFakeBlobsRun::new(0), + erasure_config, ), BroadcastStageType::BroadcastBadBlobSizes => BroadcastStage::new( @@ -87,6 +91,7 @@ impl BroadcastStageType { exit_sender, blocktree, BroadcastBadBlobSizes::new(), + erasure_config, ), } } @@ -138,8 +143,9 @@ impl BroadcastStage { receiver: &Receiver, blocktree: &Arc, mut broadcast_stage_run: impl BroadcastRun, + erasure_config: &ErasureConfig, ) -> BroadcastStageReturnType { - let coding_generator = CodingGenerator::default(); + let coding_generator = CodingGenerator::new_from_config(erasure_config); let mut broadcast = Broadcast { coding_generator, @@ -191,9 +197,11 @@ impl BroadcastStage { exit_sender: &Arc, blocktree: &Arc, broadcast_stage_run: impl BroadcastRun + Send + 'static, + erasure_config: &ErasureConfig, ) -> Self { let blocktree = blocktree.clone(); let exit_sender = exit_sender.clone(); + let erasure_config = *erasure_config; let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { @@ -204,6 +212,7 @@ impl BroadcastStage { &receiver, &blocktree, broadcast_stage_run, + &erasure_config, ) }) .unwrap(); @@ -277,6 +286,7 @@ mod test { &exit_sender, &blocktree, StandardBroadcastRun::new(), + &ErasureConfig::default(), ); MockBroadcastStage { diff --git a/core/src/chacha.rs b/core/src/chacha.rs index 7352334baa..877ad2b627 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -135,7 +135,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "5FzYtpCqL7v6ZxZ1fW4wRkn8TK96NdiD8cLV59Rr7yav" + let golden: Hash = "Dy2V98ybxnp1mDTqXrUbsLE5LyKVpQN5zDhrEKCDEFhH" .parse() .unwrap(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 48f1447b65..7607a42d12 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1652,6 +1652,7 @@ mod tests { use crate::blocktree::tests::make_many_slot_entries; use crate::blocktree::Blocktree; use crate::crds_value::CrdsValueLabel; + use crate::erasure::ErasureConfig; use crate::packet::BLOB_HEADER_SIZE; use crate::repair_service::RepairType; use crate::result::Error; @@ -1816,6 +1817,7 @@ mod tests { w_blob.set_size(data_size); w_blob.set_index(1); w_blob.set_slot(2); + w_blob.set_erasure_config(&ErasureConfig::default()); w_blob.meta.size = data_size + BLOB_HEADER_SIZE; } @@ -1860,6 +1862,7 @@ mod tests { blob.set_size(data_size); blob.set_index(i); blob.set_slot(2); + blob.set_erasure_config(&ErasureConfig::default()); blob.meta.size = data_size + BLOB_HEADER_SIZE; blob }) diff --git a/core/src/erasure.rs b/core/src/erasure.rs index f1216ccbfe..f7b801b63d 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -55,6 +55,38 @@ pub const NUM_CODING: usize = 8; /// Total number of blobs in an erasure set; includes data and coding blobs pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING; +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct ErasureConfig { + num_data: usize, + num_coding: usize, +} + +impl Default for ErasureConfig { + fn default() -> ErasureConfig { + ErasureConfig { + num_data: NUM_DATA, + num_coding: NUM_CODING, + } + } +} + +impl ErasureConfig { + pub fn new(num_data: usize, num_coding: usize) -> ErasureConfig { + ErasureConfig { + num_data, + num_coding, + } + } + + pub fn num_data(self) -> usize { + self.num_data + } + + pub fn num_coding(self) -> usize { + self.num_coding + } +} + type Result = std::result::Result; /// Represents an erasure "session" with a particular configuration and number of data and coding @@ -77,6 +109,12 @@ impl Session { Ok(Session(rs)) } + pub fn new_from_config(config: &ErasureConfig) -> Result { + let rs = ReedSolomon::new(config.num_data, config.num_coding)?; + + Ok(Session(rs)) + } + /// Create coding blocks by overwriting `parity` pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> { self.0.encode_sep(data, parity)?; @@ -136,7 +174,7 @@ impl Session { let idx; let first_byte; - if n < NUM_DATA { + if n < self.0.data_shard_count() { let mut blob = Blob::new(&blocks[n]); blob.meta.size = blob.data_size() as usize; @@ -181,6 +219,13 @@ impl CodingGenerator { } } + pub fn new_from_config(config: &ErasureConfig) -> Self { + CodingGenerator { + leftover: Vec::with_capacity(config.num_data), + session: Arc::new(Session::new_from_config(config).unwrap()), + } + } + /// Yields next set of coding blobs, if any. /// Must be called with consecutive data blobs within a slot. /// @@ -235,6 +280,7 @@ impl CodingGenerator { coding_blob.set_id(&id); coding_blob.set_size(max_data_size); coding_blob.set_coding(); + coding_blob.set_erasure_config(&data_blob.erasure_config()); coding_blobs.push(coding_blob); } @@ -744,6 +790,7 @@ pub mod test { let mut blob = Blob::default(); blob.data_mut()[..].copy_from_slice(&data); blob.set_size(BLOB_DATA_SIZE); + blob.set_erasure_config(&ErasureConfig::default()); Arc::new(RwLock::new(blob)) }) .collect(); diff --git a/core/src/packet.rs b/core/src/packet.rs index fd1ffcf9cd..4376ee5279 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -1,5 +1,6 @@ //! The `packet` module defines data structures and methods to pull data from the network. use crate::cuda_runtime::PinnedVec; +use crate::erasure::ErasureConfig; use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use crate::recycler::{Recycler, Reset}; use crate::result::{Error, Result}; @@ -389,7 +390,8 @@ const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); -const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); +const ERASURE_CONFIG_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, ErasureConfig); +const SIZE_RANGE: std::ops::Range = range!(ERASURE_CONFIG_RANGE.end, u64); macro_rules! align { ($x:expr, $align:expr) => { @@ -426,6 +428,7 @@ impl Blob { out.position() as usize }; blob.set_size(pos); + blob.set_erasure_config(&ErasureConfig::default()); blob } @@ -448,6 +451,14 @@ impl Blob { LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); } + pub fn set_erasure_config(&mut self, config: &ErasureConfig) { + self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap()) + } + + pub fn erasure_config(&self) -> ErasureConfig { + bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default() + } + pub fn seed(&self) -> [u8; 32] { let mut seed = [0; 32]; let seed_len = seed.len(); @@ -807,6 +818,15 @@ mod tests { assert!(!b.should_forward()); } + #[test] + fn test_blob_erasure_config() { + let mut b = Blob::default(); + let config = ErasureConfig::new(32, 16); + b.set_erasure_config(&config); + + assert_eq!(config, b.erasure_config()); + } + #[test] fn test_store_blobs_max() { let serialized_size_size = bincode::serialized_size(&0usize).unwrap() as usize; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 7d5fcb28e6..83893d77ca 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -687,6 +687,7 @@ mod test { use super::*; use crate::blocktree::get_tmp_ledger_path; use crate::entry; + use crate::erasure::ErasureConfig; use crate::genesis_utils::create_genesis_block; use crate::packet::{Blob, BLOB_HEADER_SIZE}; use crate::replay_stage::ReplayStage; @@ -716,6 +717,7 @@ mod test { let mut blob_slot_1 = Blob::default(); blob_slot_1.set_slot(1); blob_slot_1.set_parent(0); + blob_slot_1.set_erasure_config(&ErasureConfig::default()); blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap(); assert!(bank_forks.get(1).is_none()); ReplayStage::generate_new_bank_forks( @@ -729,6 +731,7 @@ mod test { let mut blob_slot_2 = Blob::default(); blob_slot_2.set_slot(2); blob_slot_2.set_parent(0); + blob_slot_2.set_erasure_config(&ErasureConfig::default()); blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap(); assert!(bank_forks.get(2).is_none()); ReplayStage::generate_new_bank_forks( diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 630a1b8780..cb064b33dc 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -6,6 +6,7 @@ use crate::blocktree::Blocktree; use crate::broadcast_stage::{BroadcastStage, BroadcastStageType}; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; +use crate::erasure::ErasureConfig; use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; @@ -37,6 +38,7 @@ impl Tpu { sigverify_disabled: bool, blocktree: &Arc, broadcast_type: &BroadcastStageType, + erasure_config: &ErasureConfig, exit: &Arc, ) -> Self { let (packet_sender, packet_receiver) = channel(); @@ -74,6 +76,7 @@ impl Tpu { entry_receiver, &exit, blocktree, + erasure_config, ); Self { diff --git a/core/src/validator.rs b/core/src/validator.rs index f08ebbb76f..2a2e7dd477 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -6,6 +6,7 @@ use crate::blocktree_processor::{self, BankForksInfo}; use crate::broadcast_stage::BroadcastStageType; use crate::cluster_info::{ClusterInfo, Node}; use crate::contact_info::ContactInfo; +use crate::erasure::ErasureConfig; use crate::gossip_service::{discover_cluster, GossipService}; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::PohRecorder; @@ -40,6 +41,7 @@ pub struct ValidatorConfig { pub rpc_config: JsonRpcConfig, pub snapshot_path: Option, pub broadcast_stage_type: BroadcastStageType, + pub erasure_config: ErasureConfig, } impl Default for ValidatorConfig { @@ -53,6 +55,7 @@ impl Default for ValidatorConfig { rpc_config: JsonRpcConfig::default(), snapshot_path: None, broadcast_stage_type: BroadcastStageType::Standard, + erasure_config: ErasureConfig::default(), } } } @@ -264,6 +267,7 @@ impl Validator { config.sigverify_disabled, &blocktree, &config.broadcast_stage_type, + &config.erasure_config, &exit, );