Reduce serialize/deserialize in shred recovery (#5887)

This commit is contained in:
Pankaj Garg 2019-09-12 21:52:13 -07:00 committed by GitHub
parent 5dceeec1ca
commit 8135279335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 172 additions and 93 deletions

View File

@ -4,7 +4,7 @@
use crate::entry::Entry;
use crate::erasure::ErasureConfig;
use crate::result::{Error, Result};
use crate::shred::{Shred, Shredder};
use crate::shred::{Shred, ShredMetaBuf, Shredder};
#[cfg(feature = "kvstore")]
use solana_kvstore as kvstore;
@ -320,8 +320,8 @@ impl Blocktree {
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &HashMap<u64, Index>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
prev_inserted_datas: &mut HashMap<(u64, u64), ShredMetaBuf>,
prev_inserted_codes: &mut HashMap<(u64, u64), ShredMetaBuf>,
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
@ -357,7 +357,12 @@ impl Blocktree {
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
bincode::deserialize(&data).ok()
Some(ShredMetaBuf {
slot,
index: i as u32,
data_shred: true,
shred_buf: data,
})
} else {
warn!("Data shred deleted while reading for recovery");
None
@ -377,7 +382,12 @@ impl Blocktree {
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
bincode::deserialize(&code).ok()
Some(ShredMetaBuf {
slot,
index: i as u32,
data_shred: false,
shred_buf: code,
})
} else {
warn!("Code shred deleted while reading for recovery");
None
@ -390,7 +400,7 @@ impl Blocktree {
},
);
if let Ok(mut result) = Shredder::try_recovery(
&available_shreds,
available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
@ -513,7 +523,7 @@ impl Blocktree {
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, Index>,
write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>,
) {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
@ -524,15 +534,21 @@ impl Blocktree {
let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root)
&& self
.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
.is_ok()
{
just_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
new_index_meta.map(|n| index_working_set.insert(slot, n));
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
if let Ok(shred_buf) =
self.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
{
let shred_meta = ShredMetaBuf {
slot,
index: shred_index as u32,
data_shred: false,
shred_buf,
};
just_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred_meta);
new_index_meta.map(|n| index_working_set.insert(slot, n));
}
}
}
@ -542,7 +558,7 @@ impl Blocktree {
index_working_set: &mut HashMap<u64, Index>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
just_inserted_data_shreds: &mut HashMap<(u64, u64), ShredMetaBuf>,
) {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
@ -562,16 +578,30 @@ impl Blocktree {
index_meta.data(),
&self.last_root,
) {
self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch)
.is_ok()
if let Ok(shred_buf) = self.insert_data_shred(
&mut slot_meta,
index_meta.data_mut(),
&shred,
write_batch,
) {
let shred_meta = ShredMetaBuf {
slot,
index: shred_index as u32,
data_shred: true,
shred_buf,
};
just_inserted_data_shreds.insert((slot, shred_index), shred_meta);
new_index_meta.map(|n| index_working_set.insert(slot, n));
true
} else {
false
}
} else {
false
}
};
if insert_success {
just_inserted_data_shreds.insert((slot, shred_index), shred);
new_index_meta.map(|n| index_working_set.insert(slot, n));
new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n));
}
}
@ -613,7 +643,7 @@ impl Blocktree {
index_meta: &mut Index,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<()> {
) -> Result<Vec<u8>> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let (num_data, num_coding, pos) = {
@ -663,7 +693,7 @@ impl Blocktree {
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &serialized_shred)?;
index_meta.coding_mut().set_present(shred_index, true);
Ok(())
Ok(serialized_shred)
}
fn should_insert_data_shred(
@ -758,7 +788,7 @@ impl Blocktree {
data_index: &mut DataIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<()> {
) -> Result<Vec<u8>> {
let slot = shred.slot();
let index = u64::from(shred.index());
let parent = shred.parent();
@ -802,7 +832,7 @@ impl Blocktree {
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(())
Ok(serialized_shred)
}
pub fn get_data_shred(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {

View File

@ -12,6 +12,14 @@ use std::io::{Error as IOError, ErrorKind, Write};
use std::sync::Arc;
use std::{cmp, io};
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct ShredMetaBuf {
pub slot: u64,
pub index: u32,
pub data_shred: bool,
pub shred_buf: Vec<u8>,
}
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub enum Shred {
FirstInSlot(DataShred),
@ -141,6 +149,14 @@ impl Shred {
self.signature()
.verify(pubkey.as_ref(), &shred_buf[signed_payload_offset..])
}
pub fn is_data(&self) -> bool {
if let Shred::Coding(_) = self {
false
} else {
true
}
}
}
/// A common header that is present at start of every shred
@ -524,7 +540,7 @@ impl Shredder {
}
fn fill_in_missing_shreds(
shred: &Shred,
shred: &ShredMetaBuf,
num_data: usize,
num_coding: usize,
slot: u64,
@ -540,14 +556,12 @@ impl Shredder {
return (vec![], index);
}
let mut missing_blocks: Vec<Vec<u8>> = (expected_index..index)
let missing_blocks: Vec<Vec<u8>> = (expected_index..index)
.map(|missing| {
present[missing.saturating_sub(first_index)] = false;
Shredder::new_empty_missing_shred(num_data, num_coding, slot, first_index, missing)
})
.collect();
let shred_buf = bincode::serialize(shred).unwrap();
missing_blocks.push(shred_buf);
(missing_blocks, index)
}
@ -581,7 +595,7 @@ impl Shredder {
}
pub fn try_recovery(
shreds: &[Shred],
shreds: Vec<ShredMetaBuf>,
num_data: usize,
num_coding: usize,
first_index: usize,
@ -597,10 +611,10 @@ impl Shredder {
let mut present = &mut vec![true; fec_set_size];
let mut next_expected_index = first_index;
let mut shred_bufs: Vec<Vec<u8>> = shreds
.iter()
.into_iter()
.flat_map(|shred| {
let (blocks, last_index) = Self::fill_in_missing_shreds(
shred,
let (mut blocks, last_index) = Self::fill_in_missing_shreds(
&shred,
num_data,
num_coding,
slot,
@ -608,6 +622,7 @@ impl Shredder {
next_expected_index,
&mut present,
);
blocks.push(shred.shred_buf);
next_expected_index = last_index + 1;
blocks
})
@ -711,11 +726,11 @@ impl Shredder {
Ok(Self::reassemble_payload(num_data, data_shred_bufs))
}
fn get_shred_index(shred: &Shred, num_data: usize) -> usize {
if let Shred::Coding(_) = shred {
shred.index() as usize + num_data
fn get_shred_index(shred: &ShredMetaBuf, num_data: usize) -> usize {
if shred.data_shred {
shred.index as usize
} else {
shred.index() as usize
shred.index as usize + num_data
}
}
@ -1070,16 +1085,26 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let shreds: Vec<Shred> = shredder
let (shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder
.shred_tuples
.iter()
.map(|(s, _)| s.clone())
.collect();
.map(|(s, b)| {
(
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
)
})
.unzip();
// Test0: Try recovery/reassembly with only data shreds, but not all data shreds. Hint: should fail
assert_matches!(
Shredder::try_recovery(
&shreds[..4],
shred_meta_bufs[..4].to_vec(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1090,7 +1115,7 @@ mod tests {
// Test1: Try recovery/reassembly with only data shreds. Hint: should work
let result = Shredder::try_recovery(
&shreds[..5],
shred_meta_bufs[..5].to_vec(),
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1105,23 +1130,29 @@ mod tests {
assert_eq!(data[..], result[..data.len()]);
// Test2: Try recovery/reassembly with missing data shreds + coding shreds. Hint: should work
let mut shreds: Vec<Shred> = shredder
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder
.shred_tuples
.iter()
.enumerate()
.filter_map(
|(i, (s, _))| {
if i % 2 == 0 {
Some(s.clone())
} else {
None
}
},
)
.collect();
.filter_map(|(i, (s, b))| {
if i % 2 == 0 {
Some((
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else {
None
}
})
.unzip();
let mut result = Shredder::try_recovery(
&shreds,
shred_meta_bufs,
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1178,23 +1209,29 @@ mod tests {
assert_eq!(data[..], result[..data.len()]);
// Test3: Try recovery/reassembly with 3 missing data shreds + 2 coding shreds. Hint: should work
let mut shreds: Vec<Shred> = shredder
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder
.shred_tuples
.iter()
.enumerate()
.filter_map(
|(i, (s, _))| {
if i % 2 != 0 {
Some(s.clone())
} else {
None
}
},
)
.collect();
.filter_map(|(i, (s, b))| {
if i % 2 != 0 {
Some((
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else {
None
}
})
.unzip();
let mut result = Shredder::try_recovery(
&shreds,
shred_meta_bufs,
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1274,23 +1311,29 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let mut shreds: Vec<Shred> = shredder
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder
.shred_tuples
.iter()
.enumerate()
.filter_map(
|(i, (s, _))| {
if i % 2 != 0 {
Some(s.clone())
} else {
None
}
},
)
.collect();
.filter_map(|(i, (s, b))| {
if i % 2 != 0 {
Some((
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else {
None
}
})
.unzip();
let mut result = Shredder::try_recovery(
&shreds,
shred_meta_bufs,
expected_shred_count / 2,
expected_shred_count / 2,
0,
@ -1390,23 +1433,29 @@ mod tests {
let expected_shred_count = ((data.len() / approx_shred_payload_size) + 1) * 2;
assert_eq!(shredder.shred_tuples.len(), expected_shred_count);
let mut shreds: Vec<Shred> = shredder
let (mut shreds, shred_meta_bufs): (Vec<Shred>, Vec<ShredMetaBuf>) = shredder
.shred_tuples
.iter()
.enumerate()
.filter_map(
|(i, (s, _))| {
if i % 2 != 0 {
Some(s.clone())
} else {
None
}
},
)
.collect();
.filter_map(|(i, (s, b))| {
if i % 2 != 0 {
Some((
s.clone(),
ShredMetaBuf {
slot: s.slot(),
index: s.index(),
data_shred: s.is_data(),
shred_buf: b.clone(),
},
))
} else {
None
}
})
.unzip();
let mut result = Shredder::try_recovery(
&shreds,
shred_meta_bufs.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
25,
@ -1464,7 +1513,7 @@ mod tests {
// Test7: Try recovery/reassembly with incorrect slot. Hint: does not recover any shreds
let result = Shredder::try_recovery(
&shreds,
shred_meta_bufs.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
25,
@ -1476,7 +1525,7 @@ mod tests {
// Test8: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!(
Shredder::try_recovery(
&shreds,
shred_meta_bufs.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
15,
@ -1488,7 +1537,7 @@ mod tests {
// Test9: Try recovery/reassembly with incorrect index. Hint: does not recover any shreds
assert_matches!(
Shredder::try_recovery(
&shreds,
shred_meta_bufs.clone(),
expected_shred_count / 2,
expected_shred_count / 2,
35,