Upgrade Repair be more intelligent and agressive (#6789)
* Upgrade Repair be more intelligent and agressive * Fix u64 casts * Fix missing bracket * Add 1 second delay to test to allow repair to kick in
This commit is contained in:
parent
a9c4cd6cbe
commit
67d1e2903c
|
@ -834,7 +834,7 @@ impl ClusterInfo {
|
|||
}
|
||||
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
|
||||
match repair_request {
|
||||
RepairType::Blob(slot, blob_index) => {
|
||||
RepairType::Shred(slot, blob_index) => {
|
||||
datapoint_debug!(
|
||||
"cluster_info-repair",
|
||||
("repair-slot", *slot, i64),
|
||||
|
@ -1896,7 +1896,7 @@ mod tests {
|
|||
fn window_index_request() {
|
||||
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
|
||||
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me);
|
||||
let rv = cluster_info.repair_request(&RepairType::Blob(0, 0));
|
||||
let rv = cluster_info.repair_request(&RepairType::Shred(0, 0));
|
||||
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
|
||||
|
||||
let gossip_addr = socketaddr!([127, 0, 0, 1], 1234);
|
||||
|
@ -1915,7 +1915,7 @@ mod tests {
|
|||
);
|
||||
cluster_info.insert_info(nxt.clone());
|
||||
let rv = cluster_info
|
||||
.repair_request(&RepairType::Blob(0, 0))
|
||||
.repair_request(&RepairType::Shred(0, 0))
|
||||
.unwrap();
|
||||
assert_eq!(nxt.gossip, gossip_addr);
|
||||
assert_eq!(rv.0, nxt.gossip);
|
||||
|
@ -1940,7 +1940,7 @@ mod tests {
|
|||
while !one || !two {
|
||||
//this randomly picks an option, so eventually it should pick both
|
||||
let rv = cluster_info
|
||||
.repair_request(&RepairType::Blob(0, 0))
|
||||
.repair_request(&RepairType::Shred(0, 0))
|
||||
.unwrap();
|
||||
if rv.0 == gossip_addr {
|
||||
one = true;
|
||||
|
|
|
@ -20,8 +20,8 @@ use std::{
|
|||
time::Duration,
|
||||
};
|
||||
|
||||
pub const MAX_REPAIR_LENGTH: usize = 16;
|
||||
pub const REPAIR_MS: u64 = 100;
|
||||
pub const MAX_REPAIR_LENGTH: usize = 1024;
|
||||
pub const REPAIR_MS: u64 = 50;
|
||||
pub const MAX_ORPHANS: usize = 5;
|
||||
|
||||
pub enum RepairStrategy {
|
||||
|
@ -37,7 +37,7 @@ pub enum RepairStrategy {
|
|||
pub enum RepairType {
|
||||
Orphan(u64),
|
||||
HighestBlob(u64, u64),
|
||||
Blob(u64, u64),
|
||||
Shred(u64, u64),
|
||||
}
|
||||
|
||||
pub struct RepairSlotRange {
|
||||
|
@ -254,13 +254,13 @@ impl RepairService {
|
|||
} else {
|
||||
let reqs = blocktree.find_missing_data_indexes(
|
||||
slot,
|
||||
slot_meta.first_shred_timestamp,
|
||||
slot_meta.consumed,
|
||||
slot_meta.received,
|
||||
max_repairs,
|
||||
);
|
||||
|
||||
reqs.into_iter()
|
||||
.map(|i| RepairType::Blob(slot, i))
|
||||
.map(|i| RepairType::Shred(slot, i))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
@ -480,12 +480,13 @@ mod test {
|
|||
}
|
||||
}
|
||||
blocktree.insert_shreds(shreds_to_write, None).unwrap();
|
||||
|
||||
// sleep so that the holes are ready for repair
|
||||
sleep(Duration::from_secs(1));
|
||||
let expected: Vec<RepairType> = (0..num_slots)
|
||||
.flat_map(|slot| {
|
||||
missing_indexes_per_slot
|
||||
.iter()
|
||||
.map(move |blob_index| RepairType::Blob(slot as u64, *blob_index))
|
||||
.map(move |blob_index| RepairType::Shred(slot as u64, *blob_index))
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -545,7 +546,8 @@ mod test {
|
|||
slot_shreds.remove(0);
|
||||
blocktree.insert_shreds(slot_shreds, None).unwrap();
|
||||
}
|
||||
|
||||
// sleep to make slot eligible for repair
|
||||
sleep(Duration::from_secs(1));
|
||||
// Iterate through all possible combinations of start..end (inclusive on both
|
||||
// sides of the range)
|
||||
for start in 0..slots.len() {
|
||||
|
@ -557,7 +559,7 @@ mod test {
|
|||
..=repair_slot_range.end)
|
||||
.map(|slot_index| {
|
||||
if slots.contains(&(slot_index as u64)) {
|
||||
RepairType::Blob(slot_index as u64, 0)
|
||||
RepairType::Shred(slot_index as u64, 0)
|
||||
} else {
|
||||
RepairType::HighestBlob(slot_index as u64, 0)
|
||||
}
|
||||
|
|
|
@ -20,10 +20,11 @@ use rocksdb::DBRawIterator;
|
|||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::{datapoint_debug, datapoint_error};
|
||||
use solana_rayon_threadlimit::get_thread_count;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SECOND};
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::cell::RefCell;
|
||||
use std::cmp;
|
||||
use std::collections::HashMap;
|
||||
|
@ -41,6 +42,7 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
|
|||
.unwrap()));
|
||||
|
||||
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
||||
pub const MAX_TURBINE_PROPAGATION_DELAY_TICKS: u64 = 16;
|
||||
|
||||
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
|
||||
|
||||
|
@ -827,6 +829,7 @@ impl Blocktree {
|
|||
slot_meta,
|
||||
index as u32,
|
||||
new_consumed,
|
||||
shred.reference_tick(),
|
||||
);
|
||||
data_index.set_present(index, true);
|
||||
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
|
||||
|
@ -965,6 +968,7 @@ impl Blocktree {
|
|||
fn find_missing_indexes<C>(
|
||||
db_iterator: &mut DBRawIterator,
|
||||
slot: Slot,
|
||||
first_timestamp: u64,
|
||||
start_index: u64,
|
||||
end_index: u64,
|
||||
max_missing: usize,
|
||||
|
@ -977,6 +981,8 @@ impl Blocktree {
|
|||
}
|
||||
|
||||
let mut missing_indexes = vec![];
|
||||
let ticks_since_first_insert =
|
||||
DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000;
|
||||
|
||||
// Seek to the first shred with index >= start_index
|
||||
db_iterator.seek(&C::key((slot, start_index)));
|
||||
|
@ -1004,7 +1010,15 @@ impl Blocktree {
|
|||
};
|
||||
|
||||
let upper_index = cmp::min(current_index, end_index);
|
||||
// the tick that will be used to figure out the timeout for this hole
|
||||
let reference_tick = u64::from(Shred::reference_tick_from_data(
|
||||
&db_iterator.value().expect("couldn't read value"),
|
||||
));
|
||||
|
||||
if ticks_since_first_insert < reference_tick + MAX_TURBINE_PROPAGATION_DELAY_TICKS {
|
||||
// The higher index holes have not timed out yet
|
||||
break 'outer;
|
||||
}
|
||||
for i in prev_index..upper_index {
|
||||
missing_indexes.push(i);
|
||||
if missing_indexes.len() == max_missing {
|
||||
|
@ -1030,6 +1044,7 @@ impl Blocktree {
|
|||
pub fn find_missing_data_indexes(
|
||||
&self,
|
||||
slot: Slot,
|
||||
first_timestamp: u64,
|
||||
start_index: u64,
|
||||
end_index: u64,
|
||||
max_missing: usize,
|
||||
|
@ -1041,6 +1056,7 @@ impl Blocktree {
|
|||
Self::find_missing_indexes::<cf::ShredData>(
|
||||
&mut db_iterator,
|
||||
slot,
|
||||
first_timestamp,
|
||||
start_index,
|
||||
end_index,
|
||||
max_missing,
|
||||
|
@ -1305,10 +1321,17 @@ fn update_slot_meta(
|
|||
slot_meta: &mut SlotMeta,
|
||||
index: u32,
|
||||
new_consumed: u64,
|
||||
reference_tick: u8,
|
||||
) {
|
||||
let maybe_first_insert = slot_meta.received == 0;
|
||||
// Index is zero-indexed, while the "received" height starts from 1,
|
||||
// so received = index + 1 for the same shred.
|
||||
slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received);
|
||||
if maybe_first_insert && slot_meta.received > 0 {
|
||||
// predict the timestamp of what would have been the first shred in this slot
|
||||
let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND;
|
||||
slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed;
|
||||
}
|
||||
slot_meta.consumed = new_consumed;
|
||||
slot_meta.last_index = {
|
||||
// If the last index in the slot hasn't been set before, then
|
||||
|
@ -3160,27 +3183,27 @@ pub mod tests {
|
|||
// range of [0, gap)
|
||||
let expected: Vec<u64> = (1..gap).collect();
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, 0, gap, gap as usize),
|
||||
expected
|
||||
);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize),
|
||||
expected,
|
||||
);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize),
|
||||
&expected[..expected.len() - 1],
|
||||
);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize),
|
||||
vec![gap - 2, gap - 1],
|
||||
);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1),
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, 1),
|
||||
vec![gap - 2],
|
||||
);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap, 1),
|
||||
blocktree.find_missing_data_indexes(slot, 0, 0, gap, 1),
|
||||
vec![1],
|
||||
);
|
||||
|
||||
|
@ -3189,11 +3212,11 @@ pub mod tests {
|
|||
let mut expected: Vec<u64> = (1..gap).collect();
|
||||
expected.push(gap + 1);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize),
|
||||
expected,
|
||||
);
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize),
|
||||
&expected[..expected.len() - 1],
|
||||
);
|
||||
|
||||
|
@ -3209,6 +3232,7 @@ pub mod tests {
|
|||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(
|
||||
slot,
|
||||
0,
|
||||
j * gap,
|
||||
i * gap,
|
||||
((i - j) * gap) as usize
|
||||
|
@ -3222,6 +3246,34 @@ pub mod tests {
|
|||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_missing_data_indexes_timeout() {
|
||||
let slot = 0;
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||
|
||||
// Write entries
|
||||
let gap: u64 = 10;
|
||||
let shreds: Vec<_> = (0..64)
|
||||
.map(|i| Shred::new_from_data(slot, (i * gap) as u32, 0, None, false, false, i as u8))
|
||||
.collect();
|
||||
blocktree.insert_shreds(shreds, None).unwrap();
|
||||
|
||||
let empty: Vec<u64> = vec![];
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, timestamp(), 0, 50, 1),
|
||||
empty
|
||||
);
|
||||
let expected: Vec<_> = (1..=9).collect();
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9),
|
||||
expected
|
||||
);
|
||||
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_missing_data_indexes_sanity() {
|
||||
let slot = 0;
|
||||
|
@ -3231,10 +3283,10 @@ pub mod tests {
|
|||
|
||||
// Early exit conditions
|
||||
let empty: Vec<u64> = vec![];
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 0, 1), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 5, 5, 1), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 4, 3, 1), empty);
|
||||
assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 1, 2, 0), empty);
|
||||
|
||||
let entries = create_ticks(100, 0, Hash::default());
|
||||
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
|
||||
|
@ -3258,7 +3310,7 @@ pub mod tests {
|
|||
// [i, first_index - 1]
|
||||
for start in 0..STARTS {
|
||||
let result = blocktree.find_missing_data_indexes(
|
||||
slot, start, // start
|
||||
slot, 0, start, // start
|
||||
END, //end
|
||||
MAX, //max
|
||||
);
|
||||
|
@ -3288,7 +3340,7 @@ pub mod tests {
|
|||
for i in 0..num_shreds as u64 {
|
||||
for j in 0..i {
|
||||
assert_eq!(
|
||||
blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize),
|
||||
blocktree.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize),
|
||||
empty
|
||||
);
|
||||
}
|
||||
|
|
|
@ -13,11 +13,13 @@ pub struct SlotMeta {
|
|||
// The total number of consecutive blobs starting from index 0
|
||||
// we have received for this slot.
|
||||
pub consumed: u64,
|
||||
// The index *plus one* of the highest blob received for this slot. Useful
|
||||
// for checking if the slot has received any blobs yet, and to calculate the
|
||||
// The index *plus one* of the highest shred received for this slot. Useful
|
||||
// for checking if the slot has received any shreds yet, and to calculate the
|
||||
// range where there is one or more holes: `(consumed..received)`.
|
||||
pub received: u64,
|
||||
// The index of the blob that is flagged as the last blob for this slot.
|
||||
// The timestamp of the first time a shred was added for this slot
|
||||
pub first_shred_timestamp: u64,
|
||||
// The index of the shred that is flagged as the last shred for this slot.
|
||||
pub last_index: u64,
|
||||
// The slot height of the block this one derives from.
|
||||
pub parent_slot: Slot,
|
||||
|
@ -32,7 +34,7 @@ pub struct SlotMeta {
|
|||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
|
||||
/// Index recording presence/absence of blobs
|
||||
/// Index recording presence/absence of shreds
|
||||
pub struct Index {
|
||||
pub slot: Slot,
|
||||
data: DataIndex,
|
||||
|
@ -41,14 +43,14 @@ pub struct Index {
|
|||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
|
||||
pub struct DataIndex {
|
||||
/// Map representing presence/absence of data blobs
|
||||
/// Map representing presence/absence of data shreds
|
||||
index: BTreeSet<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
|
||||
/// Erasure coding information
|
||||
pub struct CodingIndex {
|
||||
/// Map from set index, to hashmap from blob index to presence bool
|
||||
/// Map from set index, to hashmap from shred index to presence bool
|
||||
index: BTreeSet<u64>,
|
||||
}
|
||||
|
||||
|
@ -146,8 +148,8 @@ impl DataIndex {
|
|||
impl SlotMeta {
|
||||
pub fn is_full(&self) -> bool {
|
||||
// last_index is std::u64::MAX when it has no information about how
|
||||
// many blobs will fill this slot.
|
||||
// Note: A full slot with zero blobs is not possible.
|
||||
// many shreds will fill this slot.
|
||||
// Note: A full slot with zero shreds is not possible.
|
||||
if self.last_index == std::u64::MAX {
|
||||
return false;
|
||||
}
|
||||
|
@ -180,6 +182,7 @@ impl SlotMeta {
|
|||
slot,
|
||||
consumed: 0,
|
||||
received: 0,
|
||||
first_shred_timestamp: 0,
|
||||
parent_slot,
|
||||
next_slots: vec![],
|
||||
is_connected: slot == 0,
|
||||
|
|
|
@ -19,6 +19,7 @@ use solana_sdk::{
|
|||
pubkey::Pubkey,
|
||||
signature::{Keypair, KeypairUtil, Signature},
|
||||
};
|
||||
use std::mem::size_of;
|
||||
use std::{sync::Arc, time::Instant};
|
||||
|
||||
/// The following constants are computed by hand, and hardcoded.
|
||||
|
@ -347,6 +348,11 @@ impl Shred {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn reference_tick_from_data(data: &[u8]) -> u8 {
|
||||
let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::<u8>()];
|
||||
flags & SHRED_TICK_REFERENCE_MASK
|
||||
}
|
||||
|
||||
pub fn verify(&self, pubkey: &Pubkey) -> bool {
|
||||
self.signature()
|
||||
.verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..])
|
||||
|
@ -963,6 +969,7 @@ pub mod tests {
|
|||
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
|
||||
data_shreds.iter().for_each(|s| {
|
||||
assert_eq!(s.reference_tick(), 5);
|
||||
assert_eq!(Shred::reference_tick_from_data(&s.payload), 5);
|
||||
});
|
||||
|
||||
let deserialized_shred =
|
||||
|
@ -992,6 +999,10 @@ pub mod tests {
|
|||
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
|
||||
data_shreds.iter().for_each(|s| {
|
||||
assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK);
|
||||
assert_eq!(
|
||||
Shred::reference_tick_from_data(&s.payload),
|
||||
SHRED_TICK_REFERENCE_MASK
|
||||
);
|
||||
});
|
||||
|
||||
let deserialized_shred =
|
||||
|
|
Loading…
Reference in New Issue