From 8135279335a10a095049bab729f2290a888e7e57 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 12 Sep 2019 21:52:13 -0700 Subject: [PATCH] Reduce serialize/deserialize in shred recovery (#5887) --- core/src/blocktree.rs | 80 ++++++++++++------ core/src/shred.rs | 185 ++++++++++++++++++++++++++---------------- 2 files changed, 172 insertions(+), 93 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index b31d7f0a4..a97f2fbd4 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -4,7 +4,7 @@ use crate::entry::Entry; use crate::erasure::ErasureConfig; use crate::result::{Error, Result}; -use crate::shred::{Shred, Shredder}; +use crate::shred::{Shred, ShredMetaBuf, Shredder}; #[cfg(feature = "kvstore")] use solana_kvstore as kvstore; @@ -320,8 +320,8 @@ impl Blocktree { db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, index_working_set: &HashMap, - prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, - prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, + prev_inserted_datas: &mut HashMap<(u64, u64), ShredMetaBuf>, + prev_inserted_codes: &mut HashMap<(u64, u64), ShredMetaBuf>, ) -> Vec { let data_cf = db.column::(); let code_cf = db.column::(); @@ -357,7 +357,12 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch data shred"); if let Some(data) = some_data { - bincode::deserialize(&data).ok() + Some(ShredMetaBuf { + slot, + index: i as u32, + data_shred: true, + shred_buf: data, + }) } else { warn!("Data shred deleted while reading for recovery"); None @@ -377,7 +382,12 @@ impl Blocktree { .get_bytes((slot, i)) .expect("Database failure, could not fetch code shred"); if let Some(code) = some_code { - bincode::deserialize(&code).ok() + Some(ShredMetaBuf { + slot, + index: i as u32, + data_shred: false, + shred_buf: code, + }) } else { warn!("Code shred deleted while reading for recovery"); None @@ -390,7 +400,7 @@ impl Blocktree { }, ); if let Ok(mut result) = Shredder::try_recovery( - &available_shreds, + available_shreds, erasure_meta.config.num_data(), erasure_meta.config.num_coding(), set_index as usize, @@ -513,7 +523,7 @@ impl Blocktree { erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>, + just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -524,15 +534,21 @@ impl Blocktree { let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap()); // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) - && self - .insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) - .is_ok() - { - just_inserted_coding_shreds - .entry((slot, shred_index)) - .or_insert_with(|| shred); - new_index_meta.map(|n| index_working_set.insert(slot, n)); + if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { + if let Ok(shred_buf) = + self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch) + { + let shred_meta = ShredMetaBuf { + slot, + index: shred_index as u32, + data_shred: false, + shred_buf, + }; + just_inserted_coding_shreds + .entry((slot, shred_index)) + .or_insert_with(|| shred_meta); + new_index_meta.map(|n| index_working_set.insert(slot, n)); + } } } @@ -542,7 +558,7 @@ impl Blocktree { index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, - just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, + just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>, ) { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -562,16 +578,30 @@ impl Blocktree { index_meta.data(), &self.last_root, ) { - self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch) - .is_ok() + if let Ok(shred_buf) = self.insert_data_shred( + &mut slot_meta, + index_meta.data_mut(), + &shred, + write_batch, + ) { + let shred_meta = ShredMetaBuf { + slot, + index: shred_index as u32, + data_shred: true, + shred_buf, + }; + just_inserted_data_shreds.insert((slot, shred_index), shred_meta); + new_index_meta.map(|n| index_working_set.insert(slot, n)); + true + } else { + false + } } else { false } }; if insert_success { - just_inserted_data_shreds.insert((slot, shred_index), shred); - new_index_meta.map(|n| index_working_set.insert(slot, n)); new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n)); } } @@ -613,7 +643,7 @@ impl Blocktree { index_meta: &mut Index, shred: &Shred, write_batch: &mut WriteBatch, - ) -> Result<()> { + ) -> Result> { let slot = shred.slot(); let shred_index = u64::from(shred.index()); let (num_data, num_coding, pos) = { @@ -663,7 +693,7 @@ impl Blocktree { write_batch.put_bytes::((slot, shred_index), &serialized_shred)?; index_meta.coding_mut().set_present(shred_index, true); - Ok(()) + Ok(serialized_shred) } fn should_insert_data_shred( @@ -758,7 +788,7 @@ impl Blocktree { data_index: &mut DataIndex, shred: &Shred, write_batch: &mut WriteBatch, - ) -> Result<()> { + ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); let parent = shred.parent(); @@ -802,7 +832,7 @@ impl Blocktree { update_slot_meta(last_in_slot, slot_meta, index, new_consumed); data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); - Ok(()) + Ok(serialized_shred) } pub fn get_data_shred(&self, slot: u64, index: u64) -> Result>> { diff --git a/core/src/shred.rs b/core/src/shred.rs index a1bc7d909..9cae97e34 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -12,6 +12,14 @@ use std::io::{Error as IOError, ErrorKind, Write}; use std::sync::Arc; use std::{cmp, io}; +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +pub struct ShredMetaBuf { + pub slot: u64, + pub index: u32, + pub data_shred: bool, + pub shred_buf: Vec, +} + #[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] pub enum Shred { FirstInSlot(DataShred), @@ -141,6 +149,14 @@ impl Shred { self.signature() .verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..]) } + + pub fn is_data(&self) -> bool { + if let Shred::Coding(_) = self { + false + } else { + true + } + } } /// A common header that is present at start of every shred @@ -524,7 +540,7 @@ impl Shredder { } fn fill_in_missing_shreds( - shred: &Shred, + shred: &ShredMetaBuf, num_data: usize, num_coding: usize, slot: u64, @@ -540,14 +556,12 @@ impl Shredder { return (vec![], index); } - let mut missing_blocks: Vec> = (expected_index..index) + let missing_blocks: Vec> = (expected_index..index) .map(|missing| { present[missing.saturating_sub(first_index)] = false; Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing) }) .collect(); - let shred_buf = bincode::serialize(shred).unwrap(); - missing_blocks.push(shred_buf); (missing_blocks, index) } @@ -581,7 +595,7 @@ impl Shredder { } pub fn try_recovery( - shreds: &[Shred], + shreds: Vec, num_data: usize, num_coding: usize, first_index: usize, @@ -597,10 +611,10 @@ impl Shredder { let mut present = &mut vec![true; fec_set_size]; let mut next_expected_index = first_index; let mut shred_bufs: Vec> = shreds - .iter() + .into_iter() .flat_map(|shred| { - let (blocks, last_index) = Self::fill_in_missing_shreds( - shred, + let (mut blocks, last_index) = Self::fill_in_missing_shreds( + &shred, num_data, num_coding, slot, @@ -608,6 +622,7 @@ impl Shredder { next_expected_index, &mut present, ); + blocks.push(shred.shred_buf); next_expected_index = last_index + 1; blocks }) @@ -711,11 +726,11 @@ impl Shredder { Ok(Self::reassemble_payload(num_data, data_shred_bufs)) } - fn get_shred_index(shred: &Shred, num_data: usize) -> usize { - if let Shred::Coding(_) = shred { - shred.index() as usize + num_data + fn get_shred_index(shred: &ShredMetaBuf, num_data: usize) -> usize { + if shred.data_shred { + shred.index as usize } else { - shred.index() as usize + shred.index as usize + num_data } } @@ -1070,16 +1085,26 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let shreds: Vec = shredder + let (shreds, shred_meta_bufs): (Vec, Vec) = shredder .shred_tuples .iter() - .map(|(s, _)| s.clone()) - .collect(); + .map(|(s, b)| { + ( + s.clone(), + ShredMetaBuf { + slot: s.slot(), + index: s.index(), + data_shred: s.is_data(), + shred_buf: b.clone(), + }, + ) + }) + .unzip(); // Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail assert_matches!( Shredder::try_recovery( - &shreds[..4], + shred_meta_bufs[..4].to_vec(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1090,7 +1115,7 @@ mod tests { // Test1: Try recovery/reassembly with only data shreds. Hint: should work let result = Shredder::try_recovery( - &shreds[..5], + shred_meta_bufs[..5].to_vec(), expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1105,23 +1130,29 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work - let mut shreds: Vec = shredder + let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() - .filter_map( - |(i, (s, _))| { - if i % 2 == 0 { - Some(s.clone()) - } else { - None - } - }, - ) - .collect(); + .filter_map(|(i, (s, b))| { + if i % 2 == 0 { + Some(( + s.clone(), + ShredMetaBuf { + slot: s.slot(), + index: s.index(), + data_shred: s.is_data(), + shred_buf: b.clone(), + }, + )) + } else { + None + } + }) + .unzip(); let mut result = Shredder::try_recovery( - &shreds, + shred_meta_bufs, expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1178,23 +1209,29 @@ mod tests { assert_eq!(data[..], result[..data.len()]); // Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work - let mut shreds: Vec = shredder + let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() - .filter_map( - |(i, (s, _))| { - if i % 2 != 0 { - Some(s.clone()) - } else { - None - } - }, - ) - .collect(); + .filter_map(|(i, (s, b))| { + if i % 2 != 0 { + Some(( + s.clone(), + ShredMetaBuf { + slot: s.slot(), + index: s.index(), + data_shred: s.is_data(), + shred_buf: b.clone(), + }, + )) + } else { + None + } + }) + .unzip(); let mut result = Shredder::try_recovery( - &shreds, + shred_meta_bufs, expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1274,23 +1311,29 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let mut shreds: Vec = shredder + let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() - .filter_map( - |(i, (s, _))| { - if i % 2 != 0 { - Some(s.clone()) - } else { - None - } - }, - ) - .collect(); + .filter_map(|(i, (s, b))| { + if i % 2 != 0 { + Some(( + s.clone(), + ShredMetaBuf { + slot: s.slot(), + index: s.index(), + data_shred: s.is_data(), + shred_buf: b.clone(), + }, + )) + } else { + None + } + }) + .unzip(); let mut result = Shredder::try_recovery( - &shreds, + shred_meta_bufs, expected_shred_count / 2, expected_shred_count / 2, 0, @@ -1390,23 +1433,29 @@ mod tests { let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2; assert_eq!(shredder.shred_tuples.len(), expected_shred_count); - let mut shreds: Vec = shredder + let (mut shreds, shred_meta_bufs): (Vec, Vec) = shredder .shred_tuples .iter() .enumerate() - .filter_map( - |(i, (s, _))| { - if i % 2 != 0 { - Some(s.clone()) - } else { - None - } - }, - ) - .collect(); + .filter_map(|(i, (s, b))| { + if i % 2 != 0 { + Some(( + s.clone(), + ShredMetaBuf { + slot: s.slot(), + index: s.index(), + data_shred: s.is_data(), + shred_buf: b.clone(), + }, + )) + } else { + None + } + }) + .unzip(); let mut result = Shredder::try_recovery( - &shreds, + shred_meta_bufs.clone(), expected_shred_count / 2, expected_shred_count / 2, 25, @@ -1464,7 +1513,7 @@ mod tests { // Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds let result = Shredder::try_recovery( - &shreds, + shred_meta_bufs.clone(), expected_shred_count / 2, expected_shred_count / 2, 25, @@ -1476,7 +1525,7 @@ mod tests { // Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery( - &shreds, + shred_meta_bufs.clone(), expected_shred_count / 2, expected_shred_count / 2, 15, @@ -1488,7 +1537,7 @@ mod tests { // Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds assert_matches!( Shredder::try_recovery( - &shreds, + shred_meta_bufs.clone(), expected_shred_count / 2, expected_shred_count / 2, 35,