Handle blockstore insert dup checks (#16051)

This commit is contained in:
carllin 2021-03-22 16:18:22 -07:00 committed by GitHub
parent fde43a906d
commit d76ad33597
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 96 additions and 29 deletions

View File

@ -41,6 +41,7 @@ use solana_transaction_status::{
TransactionStatusMeta, TransactionWithStatusMeta, TransactionStatusMeta, TransactionWithStatusMeta,
}; };
use std::{ use std::{
borrow::Cow,
cell::RefCell, cell::RefCell,
cmp, cmp,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
@ -1187,6 +1188,7 @@ impl Blockstore {
} else if !self.should_insert_data_shred( } else if !self.should_insert_data_shred(
&shred, &shred,
slot_meta, slot_meta,
just_inserted_data_shreds,
&self.last_root, &self.last_root,
leader_schedule, leader_schedule,
is_recovered, is_recovered,
@ -1258,10 +1260,26 @@ impl Blockstore {
shred_index < slot_meta.consumed || data_index.is_present(shred_index) shred_index < slot_meta.consumed || data_index.is_present(shred_index)
} }
fn get_data_shred_from_just_inserted_or_db<'a>(
&'a self,
just_inserted_data_shreds: &'a HashMap<(u64, u64), Shred>,
slot: Slot,
index: u64,
) -> Cow<'a, Vec<u8>> {
if let Some(shred) = just_inserted_data_shreds.get(&(slot, index)) {
Cow::Borrowed(&shred.payload)
} else {
// If it doesn't exist in the just inserted set, it must exist in
// the backing store
Cow::Owned(self.get_data_shred(slot, index).unwrap().unwrap())
}
}
fn should_insert_data_shred( fn should_insert_data_shred(
&self, &self,
shred: &Shred, shred: &Shred,
slot_meta: &SlotMeta, slot_meta: &SlotMeta,
just_inserted_data_shreds: &HashMap<(u64, u64), Shred>,
last_root: &RwLock<u64>, last_root: &RwLock<u64>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_recovered: bool, is_recovered: bool,
@ -1283,25 +1301,34 @@ impl Blockstore {
.map(|leader_schedule| leader_schedule.slot_leader_at(slot, None)) .map(|leader_schedule| leader_schedule.slot_leader_at(slot, None))
.unwrap_or(None); .unwrap_or(None);
let ending_shred = self.get_data_shred(slot, last_index).unwrap().unwrap(); let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
just_inserted_data_shreds,
slot,
last_index,
);
if self if self
.store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone()) .store_duplicate_if_not_existing(
slot,
ending_shred.into_owned(),
shred.payload.clone(),
)
.is_err() .is_err()
{ {
warn!("store duplicate error"); warn!("store duplicate error");
} }
datapoint_error!( datapoint_error!(
"blockstore_error", "blockstore_error",
( (
"error", "error",
format!( format!(
"Leader {:?}, slot {}: received index {} >= slot.last_index {}, is_recovered: {}", "Leader {:?}, slot {}: received index {} >= slot.last_index {}, is_recovered: {}",
leader_pubkey, slot, shred_index, last_index, is_recovered leader_pubkey, slot, shred_index, last_index, is_recovered
), ),
String String
) )
); );
return false; return false;
} }
// Check that we do not receive a shred with "last_index" true, but shred_index // Check that we do not receive a shred with "last_index" true, but shred_index
@ -1311,28 +1338,34 @@ impl Blockstore {
.map(|leader_schedule| leader_schedule.slot_leader_at(slot, None)) .map(|leader_schedule| leader_schedule.slot_leader_at(slot, None))
.unwrap_or(None); .unwrap_or(None);
let ending_shred = self let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
.get_data_shred(slot, slot_meta.received - 1) just_inserted_data_shreds,
.unwrap() slot,
.unwrap(); slot_meta.received - 1,
);
if self if self
.store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone()) .store_duplicate_if_not_existing(
slot,
ending_shred.into_owned(),
shred.payload.clone(),
)
.is_err() .is_err()
{ {
warn!("store duplicate error"); warn!("store duplicate error");
} }
datapoint_error!( datapoint_error!(
"blockstore_error", "blockstore_error",
( (
"error", "error",
format!( format!(
"Leader {:?}, slot {}: received shred_index {} < slot.received {}, is_recovered: {}", "Leader {:?}, slot {}: received shred_index {} < slot.received {}, is_recovered: {}",
leader_pubkey, slot, shred_index, slot_meta.received, is_recovered leader_pubkey, slot, shred_index, slot_meta.received, is_recovered
), ),
String String
) )
); );
return false; return false;
} }
@ -5165,7 +5198,14 @@ pub mod tests {
} }
}; };
assert_eq!( assert_eq!(
blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false), blockstore.should_insert_data_shred(
&shred7,
&slot_meta,
&HashMap::new(),
&last_root,
None,
false
),
false false
); );
assert!(blockstore.has_duplicate_shreds_in_slot(0)); assert!(blockstore.has_duplicate_shreds_in_slot(0));
@ -5182,7 +5222,14 @@ pub mod tests {
panic!("Shred in unexpected format") panic!("Shred in unexpected format")
} }
assert_eq!( assert_eq!(
blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false), blockstore.should_insert_data_shred(
&shred7,
&slot_meta,
&HashMap::new(),
&last_root,
None,
false
),
false false
); );
} }
@ -7447,4 +7494,24 @@ pub mod tests {
} }
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_duplicate_last_index() {
let num_shreds = 2;
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
let slot = 1;
let (mut shreds, _) = make_slot_entries(slot, 0, num_entries);
// Mark both as last shred
shreds[0].set_last_in_slot();
shreds[1].set_last_in_slot();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(blockstore.get_duplicate_slot(slot).is_some());
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
} }