SlotMeta is incorrectly updated on insertion of erasure blobs (#4289)
* Fix put_coding_blob_bytes to properly update slotmetas and chaining
This commit is contained in:
parent
1d54d29076
commit
a79fbbafc9
|
@ -6,6 +6,7 @@ use crate::entry::Entry;
|
||||||
use crate::erasure::{self, Session};
|
use crate::erasure::{self, Session};
|
||||||
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
|
|
||||||
#[cfg(feature = "kvstore")]
|
#[cfg(feature = "kvstore")]
|
||||||
use solana_kvstore as kvstore;
|
use solana_kvstore as kvstore;
|
||||||
|
|
||||||
|
@ -531,6 +532,7 @@ impl Blocktree {
|
||||||
|
|
||||||
writebatch.put_bytes::<cf::Coding>((slot, index), bytes)?;
|
writebatch.put_bytes::<cf::Coding>((slot, index), bytes)?;
|
||||||
|
|
||||||
|
let recovered_data = {
|
||||||
if let Some((data, coding)) = try_erasure_recover(
|
if let Some((data, coding)) = try_erasure_recover(
|
||||||
&self.db,
|
&self.db,
|
||||||
&self.session,
|
&self.session,
|
||||||
|
@ -541,16 +543,6 @@ impl Blocktree {
|
||||||
)? {
|
)? {
|
||||||
let mut erasure_meta_working_set = HashMap::new();
|
let mut erasure_meta_working_set = HashMap::new();
|
||||||
erasure_meta_working_set.insert((slot, set_index), erasure_meta);
|
erasure_meta_working_set.insert((slot, set_index), erasure_meta);
|
||||||
|
|
||||||
insert_data_blob_batch(
|
|
||||||
&data[..],
|
|
||||||
&self.db,
|
|
||||||
&mut HashMap::new(),
|
|
||||||
&mut erasure_meta_working_set,
|
|
||||||
&mut HashMap::new(),
|
|
||||||
&mut writebatch,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
erasure_meta = *erasure_meta_working_set.values().next().unwrap();
|
erasure_meta = *erasure_meta_working_set.values().next().unwrap();
|
||||||
|
|
||||||
for coding_blob in coding {
|
for coding_blob in coding {
|
||||||
|
@ -561,11 +553,31 @@ impl Blocktree {
|
||||||
&coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()],
|
&coding_blob.data[..BLOB_HEADER_SIZE + coding_blob.size()],
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
Some(data)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
writebatch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
writebatch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
|
||||||
|
|
||||||
batch_processor.write(writebatch)?;
|
batch_processor.write(writebatch)?;
|
||||||
|
drop(batch_processor);
|
||||||
|
if let Some(data) = recovered_data {
|
||||||
|
if !data.is_empty() {
|
||||||
|
self.insert_data_blobs(&data)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put_many_coding_blob_bytes(&self, coding_blobs: &[SharedBlob]) -> Result<()> {
|
||||||
|
for shared_coding_blob in coding_blobs {
|
||||||
|
let blob = shared_coding_blob.read().unwrap();
|
||||||
|
assert!(blob.is_coding());
|
||||||
|
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||||
|
self.put_coding_blob_bytes(blob.slot(), blob.index(), &blob.data[..size])?
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1600,9 +1612,11 @@ pub mod tests {
|
||||||
use crate::entry::{
|
use crate::entry::{
|
||||||
create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice,
|
create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_hash, Entry, EntrySlice,
|
||||||
};
|
};
|
||||||
|
use crate::erasure::{CodingGenerator, NUM_CODING, NUM_DATA};
|
||||||
use crate::packet;
|
use crate::packet;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
|
use rand::Rng;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
|
@ -2579,7 +2593,7 @@ pub mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
pub fn test_chaining_tree() {
|
pub fn test_chaining_tree() {
|
||||||
let blocktree_path = get_tmp_ledger_path("test_chaining_forks");
|
let blocktree_path = get_tmp_ledger_path("test_chaining_tree");
|
||||||
{
|
{
|
||||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||||
let num_tree_levels = 6;
|
let num_tree_levels = 6;
|
||||||
|
@ -2587,7 +2601,7 @@ pub mod tests {
|
||||||
let branching_factor: u64 = 4;
|
let branching_factor: u64 = 4;
|
||||||
// Number of slots that will be in the tree
|
// Number of slots that will be in the tree
|
||||||
let num_slots = (branching_factor.pow(num_tree_levels) - 1) / (branching_factor - 1);
|
let num_slots = (branching_factor.pow(num_tree_levels) - 1) / (branching_factor - 1);
|
||||||
let entries_per_slot = 2;
|
let entries_per_slot = NUM_DATA as u64;
|
||||||
assert!(entries_per_slot > 1);
|
assert!(entries_per_slot > 1);
|
||||||
|
|
||||||
let (mut blobs, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
|
let (mut blobs, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
|
||||||
|
@ -2613,7 +2627,25 @@ pub mod tests {
|
||||||
blob.set_parent(slot_parent);
|
blob.set_parent(slot_parent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let shared_blobs: Vec<_> = slot_blobs
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(|blob| Arc::new(RwLock::new(blob)))
|
||||||
|
.collect();
|
||||||
|
let mut coding_generator = CodingGenerator::new(Arc::clone(&blocktree.session));
|
||||||
|
let coding_blobs = coding_generator.next(&shared_blobs);
|
||||||
|
assert_eq!(coding_blobs.len(), NUM_CODING);
|
||||||
|
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
|
||||||
|
// Randomly pick whether to insert erasure or coding blobs first
|
||||||
|
if rng.gen_bool(0.5) {
|
||||||
blocktree.write_blobs(slot_blobs).unwrap();
|
blocktree.write_blobs(slot_blobs).unwrap();
|
||||||
|
blocktree.put_many_coding_blob_bytes(&coding_blobs).unwrap();
|
||||||
|
} else {
|
||||||
|
blocktree.put_many_coding_blob_bytes(&coding_blobs).unwrap();
|
||||||
|
blocktree.write_blobs(slot_blobs).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure everything chains correctly
|
// Make sure everything chains correctly
|
||||||
|
@ -3218,7 +3250,8 @@ pub mod tests {
|
||||||
let ledger_path = get_tmp_ledger_path!();
|
let ledger_path = get_tmp_ledger_path!();
|
||||||
|
|
||||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||||
let data_blobs = make_slot_entries(slot, 0, 3 * NUM_DATA as u64)
|
let num_sets = 3;
|
||||||
|
let data_blobs = make_slot_entries(slot, 0, num_sets * NUM_DATA as u64)
|
||||||
.0
|
.0
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(Blob::into)
|
.map(Blob::into)
|
||||||
|
@ -3233,17 +3266,12 @@ pub mod tests {
|
||||||
assert_eq!(coding_blobs.len(), NUM_CODING);
|
assert_eq!(coding_blobs.len(), NUM_CODING);
|
||||||
|
|
||||||
let deleted_data = data_blobs[NUM_DATA - 1].clone();
|
let deleted_data = data_blobs[NUM_DATA - 1].clone();
|
||||||
debug!(
|
|
||||||
"deleted: slot: {}, index: {}",
|
|
||||||
deleted_data.read().unwrap().slot(),
|
|
||||||
deleted_data.read().unwrap().index()
|
|
||||||
);
|
|
||||||
|
|
||||||
blocktree
|
blocktree
|
||||||
.write_shared_blobs(&data_blobs[..NUM_DATA - 1])
|
.write_shared_blobs(&data_blobs[..NUM_DATA - 1])
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// this should trigger recovery
|
// This should trigger recovery of the missing data blob
|
||||||
for shared_coding_blob in coding_blobs {
|
for shared_coding_blob in coding_blobs {
|
||||||
let blob = shared_coding_blob.read().unwrap();
|
let blob = shared_coding_blob.read().unwrap();
|
||||||
let size = blob.size() + BLOB_HEADER_SIZE;
|
let size = blob.size() + BLOB_HEADER_SIZE;
|
||||||
|
@ -3254,6 +3282,20 @@ pub mod tests {
|
||||||
(slot, blob.index());
|
(slot, blob.index());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify the slot meta
|
||||||
|
let slot_meta = blocktree.meta(slot).unwrap().unwrap();
|
||||||
|
assert_eq!(slot_meta.consumed, (NUM_DATA * (set_index + 1)) as u64);
|
||||||
|
assert_eq!(slot_meta.received, (NUM_DATA * (set_index + 1)) as u64);
|
||||||
|
assert_eq!(slot_meta.parent_slot, 0);
|
||||||
|
assert!(slot_meta.next_slots.is_empty());
|
||||||
|
assert_eq!(slot_meta.is_connected, true);
|
||||||
|
if set_index as u64 == num_sets - 1 {
|
||||||
|
assert_eq!(
|
||||||
|
slot_meta.last_index,
|
||||||
|
(NUM_DATA * (set_index + 1) - 1) as u64
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let erasure_meta = blocktree
|
let erasure_meta = blocktree
|
||||||
.erasure_meta_cf
|
.erasure_meta_cf
|
||||||
.get((slot, set_index as u64))
|
.get((slot, set_index as u64))
|
||||||
|
|
Loading…
Reference in New Issue