defer HighestShred repairs during shred propagation threshold (#30142)

This commit is contained in:
Jeff Biseda 2023-02-09 14:57:55 -08:00 committed by GitHub
parent db25ccba52
commit 180273b97d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 252 additions and 48 deletions

View File

@ -4,7 +4,7 @@ use {
serve_repair::ShredRepairType, tree_diff::TreeDiff,
},
solana_ledger::{blockstore::Blockstore, blockstore_meta::SlotMeta},
solana_sdk::{clock::Slot, hash::Hash},
solana_sdk::{clock::Slot, hash::Hash, timing::timestamp},
std::collections::{HashMap, HashSet},
};
@ -187,6 +187,7 @@ pub fn get_closest_completion(
slot,
slot_meta,
limit - repairs.len(),
timestamp(),
);
repairs.extend(new_repairs);
}
@ -199,10 +200,8 @@ pub fn get_closest_completion(
pub mod test {
use {
super::*,
solana_ledger::{
blockstore::{Blockstore, MAX_TURBINE_PROPAGATION},
get_tmp_ledger_path,
},
crate::repair_service::DEFER_REPAIR_THRESHOLD,
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_sdk::hash::Hash,
std::thread::sleep,
trees::{tr, Tree, TreeWalk},
@ -256,7 +255,7 @@ pub mod test {
Hash::default(),
);
let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks);
sleep(MAX_TURBINE_PROPAGATION);
sleep(DEFER_REPAIR_THRESHOLD);
let mut slot_meta_cache = HashMap::default();
let mut processed_slots = HashSet::default();
let repairs = get_closest_completion(

View File

@ -13,12 +13,19 @@ use {
crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender},
lru::LruCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::blockstore::{Blockstore, SlotMeta},
solana_ledger::{
blockstore::{Blockstore, SlotMeta},
shred,
},
solana_measure::measure::Measure,
solana_runtime::{bank_forks::BankForks, contains::Contains},
solana_sdk::{
clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey,
clock::{Slot, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
epoch_schedule::EpochSchedule,
hash::Hash,
pubkey::Pubkey,
signer::keypair::Keypair,
timing::timestamp,
},
solana_streamer::sendmmsg::{batch_send, SendPktsError},
std::{
@ -34,7 +41,11 @@ use {
},
};
#[cfg(test)]
use {solana_ledger::shred::Nonce, solana_sdk::timing::timestamp};
use {solana_ledger::shred::Nonce, solana_sdk::clock::DEFAULT_MS_PER_SLOT};
// Time to defer repair requests to allow for turbine propagation
pub(crate) const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;
pub type DuplicateSlotsResetSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
@ -342,6 +353,7 @@ impl RepairService {
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&duplicate_slot_repair_statuses,
timestamp(),
&mut repair_timing,
&mut best_repairs_stats,
);
@ -518,20 +530,41 @@ impl RepairService {
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
now_timestamp: u64,
) -> Vec<ShredRepairType> {
if max_repairs == 0 || slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* now_timestamp.saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(DEFER_REPAIR_THRESHOLD_TICKS)
{
return vec![];
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]
} else {
let reqs = blockstore.find_missing_data_indexes(
slot,
slot_meta.first_shred_timestamp,
slot_meta.consumed,
slot_meta.received,
max_repairs,
);
reqs.into_iter()
blockstore
.find_missing_data_indexes(
slot,
now_timestamp,
slot_meta.first_shred_timestamp,
DEFER_REPAIR_THRESHOLD_TICKS,
slot_meta.consumed,
slot_meta.received,
max_repairs,
)
.into_iter()
.map(|i| ShredRepairType::Shred(slot, i))
.collect()
}
@ -544,6 +577,7 @@ impl RepairService {
max_repairs: usize,
slot: Slot,
duplicate_slot_repair_statuses: &impl Contains<'a, Slot>,
now_timestamp: u64,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
@ -558,6 +592,7 @@ impl RepairService {
slot,
&slot_meta,
max_repairs - repairs.len(),
now_timestamp,
);
repairs.extend(new_repairs);
let next_slots = slot_meta.next_slots;
@ -595,6 +630,7 @@ impl RepairService {
slot,
&meta,
max_repairs - repairs.len(),
timestamp(),
);
repairs.extend(new_repairs);
}
@ -617,6 +653,7 @@ impl RepairService {
slot,
&slot_meta,
MAX_REPAIR_PER_DUPLICATE,
timestamp(),
))
}
} else {
@ -756,6 +793,12 @@ impl RepairService {
}
}
#[cfg(test)]
pub(crate) fn post_shred_deferment_timestamp() -> u64 {
// adjust timestamp to bypass shred deferment window
timestamp() + DEFAULT_MS_PER_SLOT + DEFER_REPAIR_THRESHOLD.as_millis() as u64
}
#[cfg(test)]
mod test {
use {
@ -808,6 +851,7 @@ mod test {
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&HashSet::default(),
timestamp(),
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
@ -845,6 +889,7 @@ mod test {
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&HashSet::default(),
timestamp(),
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
@ -907,6 +952,7 @@ mod test {
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&HashSet::default(),
timestamp(),
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),
@ -923,6 +969,7 @@ mod test {
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&HashSet::default(),
timestamp(),
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
)[..],
@ -969,6 +1016,7 @@ mod test {
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&HashSet::default(),
post_shred_deferment_timestamp(),
&mut RepairTiming::default(),
&mut BestRepairsStats::default(),
),

View File

@ -157,6 +157,7 @@ impl RepairWeight {
max_unknown_last_index_repairs: usize,
max_closest_completion_repairs: usize,
ignore_slots: &impl Contains<'a, Slot>,
now_timestamp: u64,
repair_timing: &mut RepairTiming,
stats: &mut BestRepairsStats,
) -> Vec<ShredRepairType> {
@ -187,6 +188,7 @@ impl RepairWeight {
&mut best_shreds_repairs,
max_new_shreds,
ignore_slots,
now_timestamp,
);
let num_best_shreds_repairs = best_shreds_repairs.len();
let repair_slots_set: HashSet<Slot> =
@ -350,6 +352,7 @@ impl RepairWeight {
repairs: &mut Vec<ShredRepairType>,
max_new_shreds: usize,
ignore_slots: &impl Contains<'a, Slot>,
now_timestamp: u64,
) {
let root_tree = self.trees.get(&self.root).expect("Root tree must exist");
repair_weighted_traversal::get_best_repair_shreds(
@ -359,6 +362,7 @@ impl RepairWeight {
repairs,
max_new_shreds,
ignore_slots,
now_timestamp,
);
}

View File

@ -80,6 +80,7 @@ pub fn get_best_repair_shreds<'a>(
repairs: &mut Vec<ShredRepairType>,
max_new_shreds: usize,
ignore_slots: &impl Contains<'a, Slot>,
now_timestamp: u64,
) {
let initial_len = repairs.len();
let max_repairs = initial_len + max_new_shreds;
@ -106,6 +107,7 @@ pub fn get_best_repair_shreds<'a>(
slot,
slot_meta,
max_repairs - repairs.len(),
now_timestamp,
);
repairs.extend(new_repairs);
}
@ -127,6 +129,7 @@ pub fn get_best_repair_shreds<'a>(
max_repairs,
*new_child_slot,
ignore_slots,
now_timestamp,
);
}
visited_set.insert(*new_child_slot);
@ -141,12 +144,13 @@ pub fn get_best_repair_shreds<'a>(
pub mod test {
use {
super::*,
crate::repair_service::post_shred_deferment_timestamp,
solana_ledger::{
get_tmp_ledger_path,
shred::{Shred, ShredFlags},
},
solana_runtime::bank_utils,
solana_sdk::hash::Hash,
solana_sdk::{hash::Hash, timing::timestamp},
trees::tr,
};
@ -225,6 +229,7 @@ pub mod test {
let mut repairs = vec![];
let mut slot_meta_cache = HashMap::default();
let last_shred = blockstore.meta(0).unwrap().unwrap().received;
get_best_repair_shreds(
&heaviest_subtree_fork_choice,
&blockstore,
@ -232,6 +237,18 @@ pub mod test {
&mut repairs,
6,
&HashSet::default(),
timestamp().saturating_sub(1_000 * 60), // ensure deferment
);
assert_eq!(repairs, vec![]);
get_best_repair_shreds(
&heaviest_subtree_fork_choice,
&blockstore,
&mut slot_meta_cache,
&mut repairs,
6,
&HashSet::default(),
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,
@ -261,6 +278,7 @@ pub mod test {
&mut repairs,
6,
&HashSet::default(),
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,
@ -301,6 +319,7 @@ pub mod test {
&mut repairs,
4,
&HashSet::default(),
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,
@ -322,6 +341,7 @@ pub mod test {
&mut repairs,
4,
&HashSet::default(),
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,
@ -338,6 +358,7 @@ pub mod test {
// Add a branch to slot 2, make sure it doesn't repair child
// 4 again when the Unvisited(2) event happens
blockstore.add_tree(tr(2) / (tr(6) / tr(7)), true, false, 2, Hash::default());
let mut repairs = vec![];
let mut slot_meta_cache = HashMap::default();
get_best_repair_shreds(
@ -347,6 +368,7 @@ pub mod test {
&mut repairs,
std::usize::MAX,
&HashSet::default(),
post_shred_deferment_timestamp(),
);
let last_shred = blockstore.meta(0).unwrap().unwrap().received;
assert_eq!(
@ -375,6 +397,7 @@ pub mod test {
&mut repairs,
std::usize::MAX,
&ignore_set,
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,
@ -397,6 +420,7 @@ pub mod test {
&mut repairs,
std::usize::MAX,
&ignore_set,
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,
@ -418,6 +442,7 @@ pub mod test {
&mut repairs,
std::usize::MAX,
&ignore_set,
post_shred_deferment_timestamp(),
);
assert_eq!(
repairs,

View File

@ -41,7 +41,7 @@ use {
solana_rayon_threadlimit::get_max_thread_count,
solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND},
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
@ -70,7 +70,6 @@ use {
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Duration,
},
tempfile::{Builder, TempDir},
thiserror::Error,
@ -104,9 +103,6 @@ lazy_static! {
pub const MAX_REPLAY_WAKE_UP_SIGNALS: usize = 1;
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub const MAX_TURBINE_PROPAGATION: Duration = Duration::from_millis(200);
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 =
MAX_TURBINE_PROPAGATION.as_millis() as u64 / MS_PER_TICK;
// An upper bound on maximum number of data shreds we can handle in a slot
// 32K shreds would allow ~320K peak TPS
@ -1802,7 +1798,9 @@ impl Blockstore {
fn find_missing_indexes<C>(
db_iterator: &mut DBRawIterator,
slot: Slot,
now_timestamp: u64,
first_timestamp: u64,
defer_threshold_ticks: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
@ -1816,7 +1814,7 @@ impl Blockstore {
let mut missing_indexes = vec![];
let ticks_since_first_insert =
DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000;
DEFAULT_TICKS_PER_SECOND * (now_timestamp - first_timestamp) / 1000;
// Seek to the first shred with index >= start_index
db_iterator.seek(&C::key((slot, start_index)));
@ -1847,7 +1845,7 @@ impl Blockstore {
// the tick that will be used to figure out the timeout for this hole
let data = db_iterator.value().expect("couldn't read value");
let reference_tick = u64::from(shred::layout::get_reference_tick(data).unwrap());
if ticks_since_first_insert < reference_tick + MAX_TURBINE_DELAY_IN_TICKS {
if ticks_since_first_insert < reference_tick + defer_threshold_ticks {
// The higher index holes have not timed out yet
break 'outer;
}
@ -1876,7 +1874,9 @@ impl Blockstore {
pub fn find_missing_data_indexes(
&self,
slot: Slot,
now_timestamp: u64,
first_timestamp: u64,
defer_threshold_ticks: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
@ -1888,7 +1888,9 @@ impl Blockstore {
Self::find_missing_indexes::<cf::ShredData>(
&mut db_iterator,
slot,
now_timestamp,
first_timestamp,
defer_threshold_ticks,
start_index,
end_index,
max_missing,
@ -5894,27 +5896,75 @@ pub mod tests {
// range of [0, gap)
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap, gap as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
0, // start_index
gap, // end_index
gap as usize, // max_missing
),
expected
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
1, // start_index
gap, // end_index
(gap - 1) as usize, // max_missing
),
expected,
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestmap
0, // defer_threshold_ticks
0, // start_index
gap - 1, // end_index
(gap - 1) as usize, // max_missing
),
&expected[..expected.len() - 1],
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestmap
0, // defer_threshold_ticks
gap - 2, // start_index
gap, // end_index
gap as usize, // max_missing
),
vec![gap - 2, gap - 1],
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, 1),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
gap - 2, // start_index
gap, // end_index
1, // max_missing
),
vec![gap - 2],
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap, 1),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
0, // start_index
gap, // end_index
1, // max_missing
),
vec![1],
);
@ -5923,11 +5973,27 @@ pub mod tests {
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
0, // start_index
gap + 2, // end_index
(gap + 2) as usize, // max_missing
),
expected,
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
0, // start_index
gap + 2, // end_index
(gap - 1) as usize, // max_missing
),
&expected[..expected.len() - 1],
);
@ -5943,10 +6009,12 @@ pub mod tests {
assert_eq!(
blockstore.find_missing_data_indexes(
slot,
0,
j * gap,
i * gap,
((i - j) * gap) as usize
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
j * gap, // start_index
i * gap, // end_index
((i - j) * gap) as usize, // max_missing
),
expected,
);
@ -5980,12 +6048,28 @@ pub mod tests {
let empty: Vec<u64> = vec![];
assert_eq!(
blockstore.find_missing_data_indexes(slot, timestamp(), 0, 50, 1),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
timestamp(), // first_timestamp
0, // defer_threshold_ticks
0, // start_index
50, // end_index
1, // max_missing
),
empty
);
let expected: Vec<_> = (1..=9).collect();
assert_eq!(
blockstore.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
timestamp() - 400, // first_timestamp
0, // defer_threshold_ticks
0, // start_index
50, // end_index
9, // max_missing
),
expected
);
}
@ -6000,19 +6084,51 @@ pub mod tests {
// Early exit conditions
let empty: Vec<u64> = vec![];
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, 0, 1),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
0, // start_index
0, // end_index
1, // max_missing
),
empty
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 5, 5, 1),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
5, // start_index
5, // end_index
1, // max_missing
),
empty
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 4, 3, 1),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
4, // start_index
3, // end_index
1, // max_missing
),
empty
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 1, 2, 0),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
1, // start_index
2, // end_index
0, // max_missing
),
empty
);
@ -6039,9 +6155,13 @@ pub mod tests {
// [i, first_index - 1]
for start in 0..STARTS {
let result = blockstore.find_missing_data_indexes(
slot, 0, start, // start
END, //end
MAX, //max
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
start, // start_index
END, // end_index
MAX, // max_missing
);
let expected: Vec<u64> = (start..END).filter(|i| *i != ONE && *i != OTHER).collect();
assert_eq!(result, expected);
@ -6067,7 +6187,15 @@ pub mod tests {
for i in 0..num_shreds as u64 {
for j in 0..i {
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize),
blockstore.find_missing_data_indexes(
slot,
timestamp(),
0, // first_timestamp
0, // defer_threshold_ticks
j, // start_index
i, // end_index
(i - j) as usize, // max_missing
),
empty
);
}

View File

@ -668,7 +668,7 @@ pub mod layout {
(offsets.end <= shred.len()).then_some(offsets)
}
pub(crate) fn get_reference_tick(shred: &[u8]) -> Result<u8, Error> {
pub fn get_reference_tick(shred: &[u8]) -> Result<u8, Error> {
if get_shred_type(shred)? != ShredType::Data {
return Err(Error::InvalidShredType);
}