caches reed-solomon encoder/decoder instance (#27510)

ReedSolomon::new(...) initializes a matrix and a data-decode-matrix cache:
https://github.com/rust-rse/reed-solomon-erasure/blob/273ebbced/src/core.rs#L460-L466

In order to cache this computation, this commit caches the reed-solomon
encoder/decoder instance for each (data_shards, parity_shards) pair.
This commit is contained in:
behzad nouri 2022-09-25 18:09:47 +00:00 committed by GitHub
parent 9816c94d7e
commit f49beb0cbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 233 additions and 54 deletions

View File

@ -15,7 +15,7 @@ use {
solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule_cache::LeaderScheduleCache,
shred::{ProcessShredsStats, Shredder},
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_measure::measure::Measure,
solana_runtime::{bank::Bank, bank_forks::BankForks},
@ -107,6 +107,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);

View File

@ -8,8 +8,8 @@ use {
raptorq::{Decoder, Encoder},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, ShredFlags,
Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache,
Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
@ -53,6 +53,7 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
assert!(data_shreds.len() >= num_shreds);
@ -78,6 +79,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
// ~1Mb
let num_ticks = max_ticks_per_n_shreds(1, Some(LEGACY_SHRED_DATA_CAPACITY)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
shredder.entries_to_shreds(
@ -87,6 +89,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
@ -104,6 +107,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
Some(shred_size),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let reed_solomon_cache = ReedSolomonCache::default();
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, 0, 0).unwrap();
@ -114,6 +118,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
})
@ -135,6 +140,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
0,
0,
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
bencher.iter(|| {
@ -159,10 +165,12 @@ fn bench_deserialize_hdr(bencher: &mut Bencher) {
fn bench_shredder_coding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
bencher.iter(|| {
Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
)
.len();
})
@ -172,12 +180,14 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = DATA_SHREDS_PER_FEC_BLOCK;
let data_shreds = make_shreds(symbol_count);
let reed_solomon_cache = ReedSolomonCache::default();
let coding_shreds = Shredder::generate_coding_shreds(
&data_shreds[..symbol_count],
0, // next_code_index
&reed_solomon_cache,
);
bencher.iter(|| {
Shredder::try_recovery(coding_shreds[..].to_vec()).unwrap();
Shredder::try_recovery(coding_shreds[..].to_vec(), &reed_solomon_cache).unwrap();
})
}

View File

@ -443,7 +443,7 @@ pub mod test {
blockstore::Blockstore,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder},
shred::{max_ticks_per_n_shreds, ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_runtime::bank::Bank,
solana_sdk::{
@ -482,6 +482,7 @@ pub mod test {
0, // next_shred_index,
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
(

View File

@ -4,7 +4,7 @@ use {
itertools::Itertools,
solana_entry::entry::Entry,
solana_gossip::contact_info::ContactInfo,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signature, Signer},
@ -36,6 +36,7 @@ pub(super) struct BroadcastDuplicatesRun {
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
original_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
partition_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
impl BroadcastDuplicatesRun {
@ -56,6 +57,7 @@ impl BroadcastDuplicatesRun {
cluster_nodes_cache,
original_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
partition_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
@ -164,6 +166,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
@ -180,6 +183,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't
@ -192,6 +196,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
self.next_shred_index,
self.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let sigs: Vec<_> = partition_last_data_shred

View File

@ -1,7 +1,7 @@
use {
super::*,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
};
@ -11,6 +11,7 @@ pub(super) struct BroadcastFakeShredsRun {
partition: usize,
shred_version: u16,
next_code_index: u32,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
impl BroadcastFakeShredsRun {
@ -20,6 +21,7 @@ impl BroadcastFakeShredsRun {
partition,
shred_version,
next_code_index: 0,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
@ -61,6 +63,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
@ -81,6 +84,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

View File

@ -1,7 +1,7 @@
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
};
@ -17,6 +17,7 @@ pub(super) struct FailEntryVerificationBroadcastRun {
next_shred_index: u32,
next_code_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
impl FailEntryVerificationBroadcastRun {
@ -32,6 +33,7 @@ impl FailEntryVerificationBroadcastRun {
next_shred_index: 0,
next_code_index: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
}
@ -92,6 +94,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
@ -107,6 +110,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
// Don't mark the last shred as last so that validators won't know
@ -119,6 +123,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
self.next_shred_index += 1;

View File

@ -9,7 +9,7 @@ use {
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shred, ShredFlags, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
@ -29,6 +29,7 @@ pub struct StandardBroadcastRun {
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
reed_solomon_cache: Arc<ReedSolomonCache>,
}
impl StandardBroadcastRun {
@ -48,6 +49,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes_cache,
reed_solomon_cache: Arc::<ReedSolomonCache>::default(),
}
}
@ -77,6 +79,7 @@ impl StandardBroadcastRun {
state.next_shred_index,
state.next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
stats,
);
self.report_and_reset_stats(true);
@ -126,6 +129,7 @@ impl StandardBroadcastRun {
next_shred_index,
next_code_index,
false, // merkle_variant
&self.reed_solomon_cache,
process_stats,
);
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {

View File

@ -251,7 +251,7 @@ mod tests {
super::*,
solana_ledger::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredFlags},
shred::{ReedSolomonCache, Shred, ShredFlags},
},
};
@ -294,6 +294,7 @@ mod tests {
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
3, // next_code_index
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(

View File

@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, Shred},
shred::{self, Nonce, ReedSolomonCache, Shred},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
@ -220,6 +220,7 @@ fn run_insert<F>(
completed_data_sets_sender: &CompletedDataSetsSender,
retransmit_sender: &Sender<Vec<ShredPayload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
@ -282,6 +283,7 @@ where
false, // is_trusted
Some(retransmit_sender),
&handle_duplicate,
reed_solomon_cache,
metrics,
)?;
for index in inserted_indices {
@ -411,6 +413,7 @@ impl WindowService {
.thread_name(|i| format!("solWinInsert{:02}", i))
.build()
.unwrap();
let reed_solomon_cache = ReedSolomonCache::default();
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
@ -432,6 +435,7 @@ impl WindowService {
&completed_data_sets_sender,
&retransmit_sender,
&outstanding_requests,
&reed_solomon_cache,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &handle_error) {
@ -507,6 +511,7 @@ mod test {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds

View File

@ -284,7 +284,7 @@ pub(crate) mod tests {
super::*,
rand::Rng,
solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, Shredder},
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
solana_sdk::{
hash,
signature::{Keypair, Signer},
@ -343,6 +343,7 @@ pub(crate) mod tests {
next_shred_index,
next_shred_index, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.swap_remove(0)

View File

@ -17,8 +17,8 @@ use {
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{
self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, Shred, ShredData,
ShredId, ShredType, Shredder,
self, max_ticks_per_n_shreds, ErasureSetId, ProcessShredsStats, ReedSolomonCache,
Shred, ShredData, ShredId, ShredType, Shredder,
},
slot_stats::{ShredSource, SlotsStats},
},
@ -634,6 +634,7 @@ impl Blockstore {
recovered_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
reed_solomon_cache: &ReedSolomonCache,
) {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
@ -652,7 +653,7 @@ impl Blockstore {
code_cf,
))
.collect();
if let Ok(mut result) = shred::recover(available_shreds) {
if let Ok(mut result) = shred::recover(available_shreds, reed_solomon_cache) {
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_shreds.append(&mut result);
} else {
@ -712,6 +713,7 @@ impl Blockstore {
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
reed_solomon_cache: &ReedSolomonCache,
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
@ -734,6 +736,7 @@ impl Blockstore {
&mut recovered_shreds,
&data_cf,
&code_cf,
reed_solomon_cache,
);
}
ErasureMetaStatus::DataFull => {
@ -812,6 +815,7 @@ impl Blockstore {
is_trusted: bool,
retransmit_sender: Option<&Sender<Vec</*shred:*/ Vec<u8>>>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
where
@ -899,6 +903,7 @@ impl Blockstore {
&erasure_metas,
&mut index_working_set,
&just_inserted_shreds,
reed_solomon_cache,
);
metrics.num_recovered += recovered_shreds
@ -1098,6 +1103,7 @@ impl Blockstore {
is_trusted,
None, // retransmit-sender
&|_| {}, // handle-duplicates
&ReedSolomonCache::default(),
&mut BlockstoreInsertionMetrics::default(),
)
}
@ -1704,6 +1710,7 @@ impl Blockstore {
let mut shredder = Shredder::new(current_slot, parent_slot, 0, version).unwrap();
let mut all_shreds = vec![];
let mut slot_entries = vec![];
let reed_solomon_cache = ReedSolomonCache::default();
// Find all the entries for start_slot
for entry in entries.into_iter() {
if remaining_ticks_in_slot == 0 {
@ -1725,6 +1732,7 @@ impl Blockstore {
start_index, // next_shred_index
start_index, // next_code_index
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
all_shreds.append(&mut data_shreds);
@ -1752,6 +1760,7 @@ impl Blockstore {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
all_shreds.append(&mut data_shreds);
@ -3867,6 +3876,7 @@ pub fn create_new_ledger(
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
assert!(shreds.last().unwrap().last_in_slot());
@ -4133,6 +4143,7 @@ pub fn entries_to_test_shreds(
0, // next_shred_index,
0, // next_code_index
merkle_variant,
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
)
.0
@ -8770,6 +8781,7 @@ pub mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
@ -8824,6 +8836,7 @@ pub mod tests {
let entries1 = make_slot_entries_with_transactions(1);
let entries2 = make_slot_entries_with_transactions(1);
let leader_keypair = Arc::new(Keypair::new());
let reed_solomon_cache = ReedSolomonCache::default();
let shredder = Shredder::new(slot, 0, 0, 0).unwrap();
let (shreds, _) = shredder.entries_to_shreds(
&leader_keypair,
@ -8832,6 +8845,7 @@ pub mod tests {
0, // next_shred_index
0, // next_code_index,
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let (duplicate_shreds, _) = shredder.entries_to_shreds(
@ -8841,6 +8855,7 @@ pub mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let shred = shreds[0].clone();
@ -9188,8 +9203,17 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let coding1 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 0);
let coding2 = Shredder::generate_coding_shreds(&shreds, /*next_code_index:*/ 1);
let reed_solomon_cache = ReedSolomonCache::default();
let coding1 = Shredder::generate_coding_shreds(
&shreds,
0, // next_code_index
&reed_solomon_cache,
);
let coding2 = Shredder::generate_coding_shreds(
&shreds,
1, // next_code_index
&reed_solomon_cache,
);
for shred in &shreds {
info!("shred {:?}", shred);
}

View File

@ -76,7 +76,7 @@ pub use {
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::Shredder,
crate::shredder::{ReedSolomonCache, Shredder},
};
mod common;
@ -717,20 +717,25 @@ impl TryFrom<u8> for ShredVariant {
}
}
pub(crate) fn recover(shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
pub(crate) fn recover(
shreds: Vec<Shred>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> {
match shreds
.first()
.ok_or(TooFewShardsPresent)?
.common_header()
.shred_variant
{
ShredVariant::LegacyData | ShredVariant::LegacyCode => Shredder::try_recovery(shreds),
ShredVariant::LegacyData | ShredVariant::LegacyCode => {
Shredder::try_recovery(shreds, reed_solomon_cache)
}
ShredVariant::MerkleCode(_) | ShredVariant::MerkleData(_) => {
let shreds = shreds
.into_iter()
.map(merkle::Shred::try_from)
.collect::<Result<_, _>>()?;
Ok(merkle::recover(shreds)?
Ok(merkle::recover(shreds, reed_solomon_cache)?
.into_iter()
.map(Shred::from)
.collect())
@ -750,6 +755,7 @@ pub(crate) fn make_merkle_shreds_from_entries(
is_last_in_slot: bool,
next_shred_index: u32,
next_code_index: u32,
reed_solomon_cache: &ReedSolomonCache,
stats: &mut ProcessShredsStats,
) -> Result<Vec<Shred>, Error> {
let now = Instant::now();
@ -766,6 +772,7 @@ pub(crate) fn make_merkle_shreds_from_entries(
is_last_in_slot,
next_shred_index,
next_code_index,
reed_solomon_cache,
stats,
)?;
Ok(shreds.into_iter().flatten().map(Shred::from).collect())

View File

@ -13,7 +13,7 @@ use {
ShredFlags, ShredVariant, DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_CODING_SHRED_HEADERS,
SIZE_OF_DATA_SHRED_HEADERS, SIZE_OF_SIGNATURE,
},
shredder::{self, ReedSolomon},
shredder::{self, ReedSolomonCache},
},
assert_matches::debug_assert_matches,
itertools::Itertools,
@ -597,7 +597,10 @@ fn make_merkle_branch(
Some(MerkleBranch { root, proof })
}
pub(super) fn recover(mut shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
pub(super) fn recover(
mut shreds: Vec<Shred>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> {
// Grab {common, coding} headers from first coding shred.
let headers = shreds.iter().find_map(|shred| {
let shred = match shred {
@ -674,7 +677,9 @@ pub(super) fn recover(mut shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
.iter()
.map(|shred| Some(shred.as_ref()?.erasure_shard_as_slice().ok()?.to_vec()))
.collect();
ReedSolomon::new(num_data_shreds, num_coding_shreds)?.reconstruct(&mut shards)?;
reed_solomon_cache
.get(num_data_shreds, num_coding_shreds)?
.reconstruct(&mut shards)?;
let mask: Vec<_> = shreds.iter().map(Option::is_some).collect();
// Reconstruct code and data shreds from erasure encoded shards.
let mut shreds: Vec<_> = shreds
@ -778,6 +783,7 @@ pub(super) fn make_shreds_from_data(
is_last_in_slot: bool,
next_shred_index: u32,
next_code_index: u32,
reed_solomon_cache: &ReedSolomonCache,
stats: &mut ProcessShredsStats,
) -> Result<Vec</*erasure batch:*/ Vec<Shred>>, Error> {
fn new_shred_data(
@ -916,7 +922,9 @@ pub(super) fn make_shreds_from_data(
shreds
.into_par_iter()
.zip(next_code_index)
.map(|(shreds, next_code_index)| make_erasure_batch(keypair, shreds, next_code_index))
.map(|(shreds, next_code_index)| {
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache)
})
.collect::<Result<Vec<_>, Error>>()
});
stats.gen_coding_elapsed += now.elapsed().as_micros() as u64;
@ -929,6 +937,7 @@ fn make_erasure_batch(
keypair: &Keypair,
shreds: Vec<ShredData>,
next_code_index: u32,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> {
let num_data_shreds = shreds.len();
let erasure_batch_size = shredder::get_erasure_batch_size(num_data_shreds);
@ -949,7 +958,9 @@ fn make_erasure_batch(
// Shreds should have erasure encoded shard of the same length.
debug_assert_eq!(data.iter().map(|shard| shard.len()).dedup().count(), 1);
let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds];
ReedSolomon::new(num_data_shreds, num_coding_shreds)?.encode_sep(&data, &mut parity[..])?;
reed_solomon_cache
.get(num_data_shreds, num_coding_shreds)?
.encode_sep(&data, &mut parity[..])?;
let mut shreds: Vec<_> = shreds.into_iter().map(Shred::ShredData).collect();
// Initialize coding shreds from erasure coding shards.
common_header.index = next_code_index;
@ -1114,9 +1125,15 @@ mod test {
#[test_case(73)]
fn test_recover_merkle_shreds(num_shreds: usize) {
let mut rng = rand::thread_rng();
let reed_solomon_cache = ReedSolomonCache::default();
for num_data_shreds in 1..num_shreds {
let num_coding_shreds = num_shreds - num_data_shreds;
run_recover_merkle_shreds(&mut rng, num_data_shreds, num_coding_shreds);
run_recover_merkle_shreds(
&mut rng,
num_data_shreds,
num_coding_shreds,
&reed_solomon_cache,
);
}
}
@ -1124,6 +1141,7 @@ mod test {
rng: &mut R,
num_data_shreds: usize,
num_coding_shreds: usize,
reed_solomon_cache: &ReedSolomonCache,
) {
let keypair = Keypair::generate(rng);
let num_shreds = num_data_shreds + num_coding_shreds;
@ -1177,7 +1195,8 @@ mod test {
.collect::<Result<_, _>>()
.unwrap();
let mut parity = vec![vec![0u8; data[0].len()]; num_coding_shreds];
ReedSolomon::new(num_data_shreds, num_coding_shreds)
reed_solomon_cache
.get(num_data_shreds, num_coding_shreds)
.unwrap()
.encode_sep(&data, &mut parity[..])
.unwrap();
@ -1237,12 +1256,12 @@ mod test {
)
}) {
assert_matches!(
recover(shreds),
recover(shreds, reed_solomon_cache),
Err(Error::ErasureError(TooFewParityShards))
);
continue;
}
let recovered_shreds = recover(shreds).unwrap();
let recovered_shreds = recover(shreds, reed_solomon_cache).unwrap();
assert_eq!(size + recovered_shreds.len(), num_shreds);
assert_eq!(recovered_shreds.len(), removed_shreds.len());
removed_shreds.sort_by(|a, b| {
@ -1287,21 +1306,27 @@ mod test {
fn test_make_shreds_from_data(data_size: usize) {
let mut rng = rand::thread_rng();
let data_size = data_size.saturating_sub(16).max(1);
let reed_solomon_cache = ReedSolomonCache::default();
for data_size in (data_size..data_size + 32).step_by(3) {
run_make_shreds_from_data(&mut rng, data_size);
run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache);
}
}
#[test]
fn test_make_shreds_from_data_rand() {
let mut rng = rand::thread_rng();
let reed_solomon_cache = ReedSolomonCache::default();
for _ in 0..32 {
let data_size = rng.gen_range(0, 31200 * 7);
run_make_shreds_from_data(&mut rng, data_size);
run_make_shreds_from_data(&mut rng, data_size, &reed_solomon_cache);
}
}
fn run_make_shreds_from_data<R: Rng>(rng: &mut R, data_size: usize) {
fn run_make_shreds_from_data<R: Rng>(
rng: &mut R,
data_size: usize,
reed_solomon_cache: &ReedSolomonCache,
) {
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let keypair = Keypair::new();
let slot = 149_745_689;
@ -1323,6 +1348,7 @@ mod test {
true, // is_last_in_slot
next_shred_index,
next_code_index,
reed_solomon_cache,
&mut ProcessShredsStats::default(),
)
.unwrap();
@ -1433,7 +1459,7 @@ mod test {
})
.group_by(|shred| shred.common_header().fec_set_index)
.into_iter()
.flat_map(|(_, shreds)| recover(shreds.collect()).unwrap())
.flat_map(|(_, shreds)| recover(shreds.collect(), reed_solomon_cache).unwrap())
.collect();
assert_eq!(recovered_data_shreds.len(), data_shreds.len());
for (shred, other) in recovered_data_shreds.into_iter().zip(data_shreds) {

View File

@ -4,6 +4,7 @@ use {
},
itertools::Itertools,
lazy_static::lazy_static,
lru::LruCache,
rayon::{prelude::*, ThreadPool},
reed_solomon_erasure::{
galois_8::Field,
@ -13,7 +14,11 @@ use {
solana_measure::measure::Measure,
solana_rayon_threadlimit::get_thread_count,
solana_sdk::{clock::Slot, signature::Keypair},
std::{borrow::Borrow, fmt::Debug},
std::{
borrow::Borrow,
fmt::Debug,
sync::{Arc, Mutex},
},
};
lazy_static! {
@ -33,7 +38,11 @@ pub(crate) const ERASURE_BATCH_SIZE: [usize; 33] = [
55, 56, 58, 59, 60, 62, 63, 64, // 32
];
pub(crate) type ReedSolomon = reed_solomon_erasure::ReedSolomon<Field>;
type ReedSolomon = reed_solomon_erasure::ReedSolomon<Field>;
pub struct ReedSolomonCache(
Mutex<LruCache<(/*data_shards:*/ usize, /*parity_shards:*/ usize), Arc<ReedSolomon>>>,
);
#[derive(Debug)]
pub struct Shredder {
@ -70,6 +79,7 @@ impl Shredder {
next_shred_index: u32,
next_code_index: u32,
merkle_variant: bool,
reed_solomon_cache: &ReedSolomonCache,
stats: &mut ProcessShredsStats,
) -> (
Vec<Shred>, // data shreds
@ -87,6 +97,7 @@ impl Shredder {
is_last_in_slot,
next_shred_index,
next_code_index,
reed_solomon_cache,
stats,
)
.unwrap()
@ -95,9 +106,14 @@ impl Shredder {
}
let data_shreds =
self.entries_to_data_shreds(keypair, entries, is_last_in_slot, next_shred_index, stats);
let coding_shreds =
Self::data_shreds_to_coding_shreds(keypair, &data_shreds, next_code_index, stats)
.unwrap();
let coding_shreds = Self::data_shreds_to_coding_shreds(
keypair,
&data_shreds,
next_code_index,
reed_solomon_cache,
stats,
)
.unwrap();
(data_shreds, coding_shreds)
}
@ -174,6 +190,7 @@ impl Shredder {
keypair: &Keypair,
data_shreds: &[Shred],
next_code_index: u32,
reed_solomon_cache: &ReedSolomonCache,
process_stats: &mut ProcessShredsStats,
) -> Result<Vec<Shred>, Error> {
if data_shreds.is_empty() {
@ -204,7 +221,7 @@ impl Shredder {
.into_par_iter()
.zip(next_code_index)
.flat_map(|(shreds, next_code_index)| {
Shredder::generate_coding_shreds(&shreds, next_code_index)
Shredder::generate_coding_shreds(&shreds, next_code_index, reed_solomon_cache)
})
.collect()
});
@ -228,6 +245,7 @@ impl Shredder {
pub fn generate_coding_shreds<T: Borrow<Shred>>(
data: &[T],
next_code_index: u32,
reed_solomon_cache: &ReedSolomonCache,
) -> Vec<Shred> {
let (slot, index, version, fec_set_index) = {
let shred = data.first().unwrap().borrow();
@ -257,7 +275,8 @@ impl Shredder {
.collect::<Result<_, _>>()
.unwrap();
let mut parity = vec![vec![0u8; data[0].len()]; num_coding];
ReedSolomon::new(num_data, num_coding)
reed_solomon_cache
.get(num_data, num_coding)
.unwrap()
.encode_sep(&data, &mut parity[..])
.unwrap();
@ -282,7 +301,10 @@ impl Shredder {
.collect()
}
pub fn try_recovery(shreds: Vec<Shred>) -> Result<Vec<Shred>, Error> {
pub fn try_recovery(
shreds: Vec<Shred>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> {
let (slot, fec_set_index) = match shreds.first() {
None => return Err(Error::from(TooFewShardsPresent)),
Some(shred) => (shred.slot(), shred.fec_set_index()),
@ -322,7 +344,9 @@ impl Shredder {
mask[index] = true;
}
}
ReedSolomon::new(num_data_shreds, num_coding_shreds)?.reconstruct_data(&mut shards)?;
reed_solomon_cache
.get(num_data_shreds, num_coding_shreds)?
.reconstruct_data(&mut shards)?;
let recovered_data = mask
.into_iter()
.zip(shards)
@ -365,6 +389,38 @@ impl Shredder {
}
}
impl ReedSolomonCache {
const CAPACITY: usize = 4 * DATA_SHREDS_PER_FEC_BLOCK;
pub(crate) fn get(
&self,
data_shards: usize,
parity_shards: usize,
) -> Result<Arc<ReedSolomon>, reed_solomon_erasure::Error> {
let key = (data_shards, parity_shards);
{
let mut cache = self.0.lock().unwrap();
if let Some(entry) = cache.get(&key) {
return Ok(entry.clone());
}
}
let entry = ReedSolomon::new(data_shards, parity_shards)?;
let entry = Arc::new(entry);
{
let entry = entry.clone();
let mut cache = self.0.lock().unwrap();
cache.put(key, entry);
}
Ok(entry)
}
}
impl Default for ReedSolomonCache {
fn default() -> Self {
Self(Mutex::new(LruCache::new(Self::CAPACITY)))
}
}
/// Maps number of data shreds in each batch to the erasure batch size.
pub(crate) fn get_erasure_batch_size(num_data_shreds: usize) -> usize {
ERASURE_BATCH_SIZE
@ -464,6 +520,7 @@ mod tests {
start_index, // next_shred_index
start_index, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
let next_index = data_shreds.last().unwrap().index() + 1;
@ -542,6 +599,7 @@ mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
let deserialized_shred =
@ -573,6 +631,7 @@ mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.iter().for_each(|s| {
@ -609,6 +668,7 @@ mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.iter().for_each(|s| {
@ -654,6 +714,7 @@ mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
for (i, s) in data_shreds.iter().enumerate() {
@ -701,6 +762,7 @@ mod tests {
})
.collect();
let reed_solomon_cache = ReedSolomonCache::default();
let serialized_entries = bincode::serialize(&entries).unwrap();
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair,
@ -709,6 +771,7 @@ mod tests {
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let num_coding_shreds = coding_shreds.len();
@ -728,12 +791,17 @@ mod tests {
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
assert_eq!(
Shredder::try_recovery(data_shreds[..data_shreds.len() - 1].to_vec()).unwrap(),
Shredder::try_recovery(
data_shreds[..data_shreds.len() - 1].to_vec(),
&reed_solomon_cache
)
.unwrap(),
Vec::default()
);
// Test1: Try recovery/reassembly with only data shreds. Hint: should work
let recovered_data = Shredder::try_recovery(data_shreds[..].to_vec()).unwrap();
let recovered_data =
Shredder::try_recovery(data_shreds[..].to_vec(), &reed_solomon_cache).unwrap();
assert!(recovered_data.is_empty());
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
@ -743,7 +811,8 @@ mod tests {
.filter_map(|(i, b)| if i % 2 == 0 { Some(b.clone()) } else { None })
.collect();
let mut recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap();
let mut recovered_data =
Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap();
assert_eq!(recovered_data.len(), 2); // Data shreds 1 and 3 were missing
let recovered_shred = recovered_data.remove(0);
@ -783,7 +852,8 @@ mod tests {
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
.collect();
let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap();
let recovered_data =
Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap();
assert_eq!(recovered_data.len(), 3); // Data shreds 0, 2, 4 were missing
for (i, recovered_shred) in recovered_data.into_iter().enumerate() {
@ -838,6 +908,7 @@ mod tests {
25, // next_shred_index,
25, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
// We should have 10 shreds now
@ -855,7 +926,8 @@ mod tests {
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
.collect();
let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap();
let recovered_data =
Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap();
assert_eq!(recovered_data.len(), 3); // Data shreds 25, 27, 29 were missing
for (i, recovered_shred) in recovered_data.into_iter().enumerate() {
@ -879,7 +951,8 @@ mod tests {
assert_eq!(serialized_entries[..], result[..serialized_entries.len()]);
// Test6: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds
let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap();
let recovered_data =
Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap();
assert!(recovered_data.is_empty());
}
@ -923,6 +996,7 @@ mod tests {
)
.unwrap();
let next_shred_index = rng.gen_range(1, 1024);
let reed_solomon_cache = ReedSolomonCache::default();
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair,
&[entry],
@ -930,6 +1004,7 @@ mod tests {
next_shred_index,
next_shred_index, // next_code_index
false, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let num_data_shreds = data_shreds.len();
@ -949,7 +1024,7 @@ mod tests {
.filter(|shred| shred.is_data())
.map(|shred| shred.index())
.collect();
let recovered_shreds = Shredder::try_recovery(shreds).unwrap();
let recovered_shreds = Shredder::try_recovery(shreds, &reed_solomon_cache).unwrap();
assert_eq!(
recovered_shreds,
data_shreds
@ -991,6 +1066,7 @@ mod tests {
0, // next_shred_index
0, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
assert!(!data_shreds
@ -1024,6 +1100,7 @@ mod tests {
start_index, // next_shred_index
start_index, // next_code_index
true, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
const MIN_CHUNK_SIZE: usize = DATA_SHREDS_PER_FEC_BLOCK;
@ -1084,6 +1161,7 @@ mod tests {
);
let next_code_index = data_shreds[0].index();
let reed_solomon_cache = ReedSolomonCache::default();
for size in (1..data_shreds.len()).step_by(5) {
let data_shreds = &data_shreds[..size];
@ -1091,6 +1169,7 @@ mod tests {
&keypair,
data_shreds,
next_code_index,
&reed_solomon_cache,
&mut stats,
)
.unwrap();

View File

@ -2,8 +2,8 @@
use {
solana_entry::entry::Entry,
solana_ledger::shred::{
max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, Shred, Shredder,
DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
max_entries_per_n_shred, verify_test_data_shred, ProcessShredsStats, ReedSolomonCache,
Shred, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_sdk::{
clock::Slot,
@ -47,6 +47,7 @@ fn test_multi_fec_block_coding() {
})
.collect();
let reed_solomon_cache = ReedSolomonCache::default();
let serialized_entries = bincode::serialize(&entries).unwrap();
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
&keypair,
@ -55,6 +56,7 @@ fn test_multi_fec_block_coding() {
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
let next_index = data_shreds.last().unwrap().index() + 1;
@ -82,7 +84,8 @@ fn test_multi_fec_block_coding() {
.filter_map(|(i, b)| if i % 2 != 0 { Some(b.clone()) } else { None })
.collect();
let recovered_data = Shredder::try_recovery(shred_info.clone()).unwrap();
let recovered_data =
Shredder::try_recovery(shred_info.clone(), &reed_solomon_cache).unwrap();
for (i, recovered_shred) in recovered_data.into_iter().enumerate() {
let index = shred_start_index + (i * 2);
@ -116,6 +119,7 @@ fn test_multi_fec_block_different_size_coding() {
setup_different_sized_fec_blocks(slot, parent_slot, keypair.clone());
let total_num_data_shreds: usize = fec_data.values().map(|x| x.len()).sum();
let reed_solomon_cache = ReedSolomonCache::default();
// Test recovery
for (fec_data_shreds, fec_coding_shreds) in fec_data.values().zip(fec_coding.values()) {
let first_data_index = fec_data_shreds.first().unwrap().index() as usize;
@ -125,7 +129,7 @@ fn test_multi_fec_block_different_size_coding() {
.chain(fec_coding_shreds.iter().step_by(2))
.cloned()
.collect();
let recovered_data = Shredder::try_recovery(all_shreds).unwrap();
let recovered_data = Shredder::try_recovery(all_shreds, &reed_solomon_cache).unwrap();
// Necessary in order to ensure the last shred in the slot
// is part of the recovered set, and that the below `index`
// calcuation in the loop is correct
@ -219,6 +223,7 @@ fn setup_different_sized_fec_blocks(
let mut coding_slot_and_index = HashSet::new();
let total_num_data_shreds: usize = 2 * num_shreds_per_iter;
let reed_solomon_cache = ReedSolomonCache::default();
for i in 0..2 {
let is_last = i == 1;
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
@ -228,6 +233,7 @@ fn setup_different_sized_fec_blocks(
next_shred_index,
next_code_index,
false, // merkle_variant
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
for shred in &data_shreds {