Add repair number per slot (#18082)

This commit is contained in:
sakridge 2021-06-30 18:20:07 +02:00 committed by GitHub
parent 00d1125e98
commit 8d9a6deda4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 55 deletions

View File

@ -188,9 +188,14 @@ where
}
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
let repairs: Vec<_> = repair_infos
.iter()
.map(|repair_info| repair_info.is_some())
.collect();
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
shreds,
repairs,
Some(leader_schedule_cache),
false,
&handle_duplicate,

View File

@ -45,7 +45,7 @@ use std::{
borrow::Cow,
cell::RefCell,
cmp,
collections::{HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
@ -56,6 +56,7 @@ use std::{
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Instant,
};
use thiserror::Error;
use trees::{Tree, TreeWalk};
@ -148,6 +149,27 @@ pub struct Blockstore {
pub completed_slots_senders: Vec<CompletedSlotsSender>,
pub lowest_cleanup_slot: Arc<RwLock<Slot>>,
no_compaction: bool,
slots_stats: Arc<Mutex<SlotsStats>>,
}
struct SlotsStats {
last_cleanup_ts: Instant,
stats: BTreeMap<Slot, SlotStats>,
}
impl Default for SlotsStats {
fn default() -> Self {
SlotsStats {
last_cleanup_ts: Instant::now(),
stats: BTreeMap::new(),
}
}
}
#[derive(Default)]
struct SlotStats {
num_repaired: usize,
num_recovered: usize,
}
pub struct IndexMetaWorkingSetEntry {
@ -165,6 +187,13 @@ pub struct SlotMetaWorkingSetEntry {
did_insert_occur: bool,
}
#[derive(PartialEq, Debug, Clone)]
enum ShredSource {
Turbine,
Repaired,
Recovered,
}
#[derive(Default)]
pub struct BlockstoreInsertionMetrics {
pub num_shreds: usize,
@ -367,6 +396,7 @@ impl Blockstore {
last_root,
lowest_cleanup_slot: Arc::new(RwLock::new(0)),
no_compaction: false,
slots_stats: Arc::new(Mutex::new(SlotsStats::default())),
};
if initialize_transaction_status_index {
blockstore.initialize_transaction_status_index()?;
@ -758,6 +788,7 @@ impl Blockstore {
pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
is_repaired: Vec<bool>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
handle_duplicate: &F,
@ -766,6 +797,7 @@ impl Blockstore {
where
F: Fn(Shred),
{
assert_eq!(shreds.len(), is_repaired.len());
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
@ -787,46 +819,56 @@ impl Blockstore {
let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
shreds.into_iter().enumerate().for_each(|(i, shred)| {
if shred.is_data() {
let shred_slot = shred.slot();
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
false,
) {
newly_completed_data_sets.extend(completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
));
inserted_indices.push(i);
num_inserted += 1;
shreds
.into_iter()
.zip(is_repaired.into_iter())
.enumerate()
.for_each(|(i, (shred, is_repaired))| {
if shred.is_data() {
let shred_slot = shred.slot();
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
shred_source,
) {
newly_completed_data_sets.extend(completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
));
inserted_indices.push(i);
num_inserted += 1;
}
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
);
} else {
panic!("There should be no other case");
}
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
);
} else {
panic!("There should be no other case");
}
});
});
start.stop();
let insert_shreds_elapsed = start.as_us();
@ -861,7 +903,7 @@ impl Blockstore {
is_trusted,
&handle_duplicate,
leader_schedule,
true,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
num_recovered_exists += 1;
@ -993,8 +1035,10 @@ impl Blockstore {
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
let shreds_len = shreds.len();
self.insert_shreds_handle_duplicate(
shreds,
vec![false; shreds_len],
leader_schedule,
is_trusted,
&|_| {},
@ -1038,6 +1082,7 @@ impl Blockstore {
index_meta_time: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
is_repaired: bool,
) -> bool
where
F: Fn(Shred),
@ -1105,6 +1150,12 @@ impl Blockstore {
return false;
}
if is_repaired {
let mut slots_stats = self.slots_stats.lock().unwrap();
let mut e = slots_stats.stats.entry(slot).or_default();
e.num_repaired += 1;
}
// Should be safe to modify index_meta here. Two cases
// 1) Recovery happens: Then all inserted erasure metas are removed
// from just_received_coding_shreds, and nothing will be committed by
@ -1165,7 +1216,7 @@ impl Blockstore {
is_trusted: bool,
handle_duplicate: &F,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_recovered: bool,
shred_source: ShredSource,
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
where
F: Fn(Shred),
@ -1208,15 +1259,20 @@ impl Blockstore {
just_inserted_data_shreds,
&self.last_root,
leader_schedule,
is_recovered,
shred_source.clone(),
) {
return Err(InsertDataShredError::InvalidShred);
}
}
let set_index = u64::from(shred.common_header.fec_set_index);
let newly_completed_data_sets =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?;
let newly_completed_data_sets = self.insert_data_shred(
slot_meta,
index_meta.data_mut(),
&shred,
write_batch,
shred_source,
)?;
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
@ -1295,7 +1351,7 @@ impl Blockstore {
just_inserted_data_shreds: &HashMap<(u64, u64), Shred>,
last_root: &RwLock<u64>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_recovered: bool,
shred_source: ShredSource,
) -> bool {
use crate::shred::SHRED_PAYLOAD_SIZE;
let shred_index = u64::from(shred.index());
@ -1371,8 +1427,8 @@ impl Blockstore {
(
"error",
format!(
"Leader {:?}, slot {}: received index {} >= slot.last_index {}, is_recovered: {}",
leader_pubkey, slot, shred_index, last_index, is_recovered
"Leader {:?}, slot {}: received index {} >= slot.last_index {}, shred_source: {:?}",
leader_pubkey, slot, shred_index, last_index, shred_source
),
String
)
@ -1407,8 +1463,8 @@ impl Blockstore {
(
"error",
format!(
"Leader {:?}, slot {}: received shred_index {} < slot.received {}, is_recovered: {}",
leader_pubkey, slot, shred_index, slot_meta.received, is_recovered
"Leader {:?}, slot {}: received shred_index {} < slot.received {}, shred_source: {:?}",
leader_pubkey, slot, shred_index, slot_meta.received, shred_source
),
String
)
@ -1426,6 +1482,7 @@ impl Blockstore {
data_index: &mut ShredIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
shred_source: ShredSource,
) -> Result<Vec<(u32, u32)>> {
let slot = shred.slot();
let index = u64::from(shred.index());
@ -1476,7 +1533,30 @@ impl Blockstore {
shred.reference_tick(),
data_index,
);
if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered {
let mut slots_stats = self.slots_stats.lock().unwrap();
let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();
if shred_source == ShredSource::Repaired {
e.num_repaired += 1;
}
if shred_source == ShredSource::Recovered {
e.num_recovered += 1;
}
}
if slot_meta.is_full() {
let (num_repaired, num_recovered) = {
let mut slots_stats = self.slots_stats.lock().unwrap();
if let Some(e) = slots_stats.stats.remove(&slot_meta.slot) {
if slots_stats.last_cleanup_ts.elapsed().as_secs() > 30 {
let root = self.last_root();
slots_stats.stats = slots_stats.stats.split_off(&root);
slots_stats.last_cleanup_ts = Instant::now();
}
(e.num_repaired, e.num_recovered)
} else {
(0, 0)
}
};
datapoint_info!(
"shred_insert_is_full",
(
@ -1486,6 +1566,8 @@ impl Blockstore {
),
("slot", slot_meta.slot, i64),
("last_index", slot_meta.last_index, i64),
("num_repaired", num_repaired, i64),
("num_recovered", num_recovered, i64),
);
}
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
@ -5405,7 +5487,7 @@ pub mod tests {
&HashMap::new(),
&last_root,
None,
false
ShredSource::Turbine
));
// Ensure that an empty shred (one with no data) would get inserted. Such shreds
@ -5428,7 +5510,7 @@ pub mod tests {
&HashMap::new(),
&last_root,
None,
false
ShredSource::Repaired,
));
empty_shred.data_header.size = 0;
assert!(!blockstore.should_insert_data_shred(
@ -5437,7 +5519,7 @@ pub mod tests {
&HashMap::new(),
&last_root,
None,
false
ShredSource::Recovered,
));
// Trying to insert another "is_last" shred with index < the received index should fail
@ -5461,7 +5543,7 @@ pub mod tests {
&HashMap::new(),
&last_root,
None,
false
ShredSource::Repaired,
));
assert!(blockstore.has_duplicate_shreds_in_slot(0));
@ -5482,7 +5564,7 @@ pub mod tests {
&HashMap::new(),
&last_root,
None,
false
ShredSource::Repaired,
));
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
@ -5550,6 +5632,7 @@ pub mod tests {
panic!("no dupes");
},
false,
false,
));
// insert again fails on dupe
@ -5565,6 +5648,7 @@ pub mod tests {
counter.fetch_add(1, Ordering::Relaxed);
},
false,
false,
));
assert_eq!(counter.load(Ordering::Relaxed), 1);
}