* Fix inserting bogus is_last blobs into blocktree

* Check for pre-existing blob before insert

* Ignore test that performs concurrent writes on blocktree as that is not supported
This commit is contained in:
carllin 2019-04-25 00:04:49 -07:00 committed by GitHub
parent cf91ff8694
commit c4d9dff590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 215 additions and 55 deletions

View File

@ -985,6 +985,14 @@ impl Blocktree {
Ok(())
}
fn data_blob_exists(&self, slot: u64, index: u64) -> bool {
self.erasure_meta_cf
.get((slot, ErasureMeta::set_index_for(index)))
.expect("Expect database get to succeed")
.map(|e| e.is_data_present(index))
.unwrap_or(false)
}
fn is_orphan(meta: &SlotMeta) -> bool {
// If we have no parent, then this is the head of a detached chain of
// slots
@ -1106,7 +1114,8 @@ impl Blocktree {
let slot_meta = &mut entry.0.borrow_mut();
// This slot is full, skip the bogus blob
if slot_meta.is_full() {
// Check if this blob should be inserted
if !self.should_insert_blob(&slot_meta, &prev_inserted_blob_datas, blob) {
false
} else {
let _ = self.insert_data_blob(blob, prev_inserted_blob_datas, slot_meta, write_batch);
@ -1126,12 +1135,6 @@ impl Blocktree {
let blob_slot = blob_to_insert.slot();
let blob_size = blob_to_insert.size();
if blob_index < slot_meta.consumed
|| prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index))
{
return Err(Error::BlocktreeError(BlocktreeError::BlobForIndexExists));
}
let new_consumed = {
if slot_meta.consumed == blob_index {
let blob_datas = self.get_slot_consecutive_blobs(
@ -1305,6 +1308,61 @@ impl Blocktree {
self.db.write(batch)?;
Ok(())
}
fn should_insert_blob(
&self,
slot: &SlotMeta,
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
blob: &Blob,
) -> bool {
let blob_index = blob.index();
let blob_slot = blob.slot();
// Check that the blob doesn't already exist
if blob_index < slot.consumed
|| prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index))
|| self.data_blob_exists(blob_slot, blob_index)
{
return false;
}
// Check that we do not receive blobs >= than the last_index
// for the slot
let last_index = slot.last_index;
if blob_index >= last_index {
solana_metrics::submit(
solana_metrics::influxdb::Point::new("blocktree_error")
.add_field(
"error",
solana_metrics::influxdb::Value::String(format!(
"Received last blob with index {} >= slot.last_index {}",
blob_index, last_index
)),
)
.to_owned(),
);
return false;
}
// Check that we do not receive a blob with "last_index" true, but index
// less than our current received
if blob.is_last_in_slot() && blob_index < slot.received {
solana_metrics::submit(
solana_metrics::influxdb::Point::new("blocktree_error")
.add_field(
"error",
solana_metrics::influxdb::Value::String(format!(
"Received last blob with index {} < slot.received {}",
blob_index, slot.received
)),
)
.to_owned(),
);
return false;
}
true
}
}
// Creates a new ledger with slot 0 full of ticks (and only ticks).
@ -2696,6 +2754,69 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_should_insert_blob() {
let (mut blobs, _) = make_slot_entries(0, 0, 20);
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Insert the first 5 blobs, we don't have a "is_last" blob yet
blocktree.insert_data_blobs(&blobs[0..5]).unwrap();
// Trying to insert a blob less than consumed should fail
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[4].clone()));
// Trying to insert the same blob again should fail
blocktree.insert_data_blobs(&blobs[7..8]).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[7].clone()));
// Trying to insert another "is_last" blob with index < the received index
// should fail
blocktree.insert_data_blobs(&blobs[8..9]).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9);
blobs[8].set_is_last_in_slot();
assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[8].clone()));
// Insert the 10th blob, which is marked as "is_last"
blobs[9].set_is_last_in_slot();
blocktree.insert_data_blobs(&blobs[9..10]).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
// Trying to insert a blob with index > the "is_last" blob should fail
assert!(!blocktree.should_insert_blob(&slot_meta, &HashMap::new(), &blobs[10].clone()));
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_multiple_is_last() {
let (mut blobs, _) = make_slot_entries(0, 0, 20);
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Inserting multiple blobs with the is_last flag set should only insert
// the first blob with the "is_last" flag, and drop the rest
for i in 6..20 {
blobs[i].set_is_last_in_slot();
}
blocktree.insert_data_blobs(&blobs[..]).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 7);
assert_eq!(slot_meta.received, 7);
assert_eq!(slot_meta.last_index, 6);
assert!(slot_meta.is_full());
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
mod erasure {
use super::*;
use crate::blocktree::meta::ErasureMetaStatus;
@ -2977,6 +3098,7 @@ pub mod tests {
}
#[test]
#[ignore]
fn test_recovery_multi_slot_multi_thread() {
use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
use std::thread;
@ -3034,7 +3156,7 @@ pub mod tests {
// is a 50/50 chance of attempting to write the coding blobs first or the data blobs
// first.
// The goal is to be as racey as possible and cover a wide range of situations
for _ in 0..N_THREADS {
for thread_id in 0..N_THREADS {
let blocktree = Arc::clone(&blocktree);
let mut rng = SmallRng::from_rng(&mut rng).unwrap();
let model = model.clone();
@ -3047,55 +3169,78 @@ pub mod tests {
.choose_multiple(&mut rng, num_erasure_sets);
for erasure_set in unordered_sets {
if rng.gen() {
blocktree
.write_shared_blobs(&erasure_set.data)
.expect("Writing data blobs must succeed");
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
for shared_coding_blob in &erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
let mut attempt = 0;
loop {
if rng.gen() {
blocktree
.put_coding_blob_bytes(
slot,
blob.index(),
&blob.data[..size],
)
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
} else {
// write coding blobs first, then write the data blobs.
for shared_coding_blob in &erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(
slot,
blob.index(),
&blob.data[..size],
)
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
.write_shared_blobs(&erasure_set.data)
.expect("Writing data blobs must succeed");
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
blocktree
.write_shared_blobs(&erasure_set.data)
.expect("Writing data blobs must succeed");
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
for shared_coding_blob in &erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(
slot,
blob.index(),
&blob.data[..size],
)
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
} else {
// write coding blobs first, then write the data blobs.
for shared_coding_blob in &erasure_set.coding {
let blob = shared_coding_blob.read().unwrap();
let size = blob.size() + BLOB_HEADER_SIZE;
blocktree
.put_coding_blob_bytes(
slot,
blob.index(),
&blob.data[..size],
)
.expect("Writing coding blobs must succeed");
}
debug!(
"multislot: wrote coding: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
blocktree
.write_shared_blobs(&erasure_set.data)
.expect("Writing data blobs must succeed");
debug!(
"multislot: wrote data: slot: {}, erasure_set: {}",
slot, erasure_set.set_index
);
}
// due to racing, some blobs might not be inserted. don't stop
// trying until *some* thread succeeds in writing everything and
// triggering recovery.
let erasure_meta = blocktree
.erasure_meta_cf
.get((slot, erasure_set.set_index))
.unwrap()
.unwrap();
let status = erasure_meta.status();
attempt += 1;
info!(
"[multi_slot] thread_id: {}, attempt: {}, slot: {}, set_index: {}, status: {:?}",
thread_id, attempt, slot, erasure_set.set_index, status
);
if status == ErasureMetaStatus::DataFull {
break;
}
}
}
}

View File

@ -35,7 +35,22 @@ impl SlotMeta {
if self.last_index == std::u64::MAX {
return false;
}
assert!(self.consumed <= self.last_index + 1);
// Should never happen
if self.consumed > self.last_index + 1 {
solana_metrics::submit(
solana_metrics::influxdb::Point::new("blocktree_error")
.add_field(
"error",
solana_metrics::influxdb::Value::String(format!(
"Observed a slot meta with consumed: {} > meta.last_index + 1: {}",
self.consumed,
self.last_index + 1
)),
)
.to_owned(),
);
}
self.consumed == self.last_index + 1
}