From 67d1e2903c6067abe08b211a18ba719a63f141b2 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 7 Nov 2019 11:08:09 -0800 Subject: [PATCH] Upgrade Repair be more intelligent and agressive (#6789) * Upgrade Repair be more intelligent and agressive * Fix u64 casts * Fix missing bracket * Add 1 second delay to test to allow repair to kick in --- core/src/cluster_info.rs | 8 ++-- core/src/repair_service.rs | 20 +++++---- ledger/src/blocktree.rs | 82 +++++++++++++++++++++++++++++------- ledger/src/blocktree_meta.rs | 19 +++++---- ledger/src/shred.rs | 11 +++++ 5 files changed, 104 insertions(+), 36 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e0a0abc24..f294d5781 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -834,7 +834,7 @@ impl ClusterInfo { } pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { match repair_request { - RepairType::Blob(slot, blob_index) => { + RepairType::Shred(slot, blob_index) => { datapoint_debug!( "cluster_info-repair", ("repair-slot", *slot, i64), @@ -1896,7 +1896,7 @@ mod tests { fn window_index_request() { let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me); - let rv = cluster_info.repair_request(&RepairType::Blob(0, 0)); + let rv = cluster_info.repair_request(&RepairType::Shred(0, 0)); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); @@ -1915,7 +1915,7 @@ mod tests { ); cluster_info.insert_info(nxt.clone()); let rv = cluster_info - .repair_request(&RepairType::Blob(0, 0)) + .repair_request(&RepairType::Shred(0, 0)) .unwrap(); assert_eq!(nxt.gossip, gossip_addr); assert_eq!(rv.0, nxt.gossip); @@ -1940,7 +1940,7 @@ mod tests { while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = cluster_info - .repair_request(&RepairType::Blob(0, 0)) + .repair_request(&RepairType::Shred(0, 0)) .unwrap(); if rv.0 == gossip_addr { one = true; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index dce93bd2d..443fe6091 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -20,8 +20,8 @@ use std::{ time::Duration, }; -pub const MAX_REPAIR_LENGTH: usize = 16; -pub const REPAIR_MS: u64 = 100; +pub const MAX_REPAIR_LENGTH: usize = 1024; +pub const REPAIR_MS: u64 = 50; pub const MAX_ORPHANS: usize = 5; pub enum RepairStrategy { @@ -37,7 +37,7 @@ pub enum RepairStrategy { pub enum RepairType { Orphan(u64), HighestBlob(u64, u64), - Blob(u64, u64), + Shred(u64, u64), } pub struct RepairSlotRange { @@ -254,13 +254,13 @@ impl RepairService { } else { let reqs = blocktree.find_missing_data_indexes( slot, + slot_meta.first_shred_timestamp, slot_meta.consumed, slot_meta.received, max_repairs, ); - reqs.into_iter() - .map(|i| RepairType::Blob(slot, i)) + .map(|i| RepairType::Shred(slot, i)) .collect() } } @@ -480,12 +480,13 @@ mod test { } } blocktree.insert_shreds(shreds_to_write, None).unwrap(); - + // sleep so that the holes are ready for repair + sleep(Duration::from_secs(1)); let expected: Vec = (0..num_slots) .flat_map(|slot| { missing_indexes_per_slot .iter() - .map(move |blob_index| RepairType::Blob(slot as u64, *blob_index)) + .map(move |blob_index| RepairType::Shred(slot as u64, *blob_index)) }) .collect(); @@ -545,7 +546,8 @@ mod test { slot_shreds.remove(0); blocktree.insert_shreds(slot_shreds, None).unwrap(); } - + // sleep to make slot eligible for repair + sleep(Duration::from_secs(1)); // Iterate through all possible combinations of start..end (inclusive on both // sides of the range) for start in 0..slots.len() { @@ -557,7 +559,7 @@ mod test { ..=repair_slot_range.end) .map(|slot_index| { if slots.contains(&(slot_index as u64)) { - RepairType::Blob(slot_index as u64, 0) + RepairType::Shred(slot_index as u64, 0) } else { RepairType::HighestBlob(slot_index as u64, 0) } diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 6ac500583..e1bfc59e2 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -20,10 +20,11 @@ use rocksdb::DBRawIterator; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, datapoint_error}; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SECOND}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::timing::timestamp; use std::cell::RefCell; use std::cmp; use std::collections::HashMap; @@ -41,6 +42,7 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: .unwrap())); pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; +pub const MAX_TURBINE_PROPAGATION_DELAY_TICKS: u64 = 16; pub type CompletedSlotsReceiver = Receiver>; @@ -827,6 +829,7 @@ impl Blocktree { slot_meta, index as u32, new_consumed, + shred.reference_tick(), ); data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); @@ -965,6 +968,7 @@ impl Blocktree { fn find_missing_indexes( db_iterator: &mut DBRawIterator, slot: Slot, + first_timestamp: u64, start_index: u64, end_index: u64, max_missing: usize, @@ -977,6 +981,8 @@ impl Blocktree { } let mut missing_indexes = vec![]; + let ticks_since_first_insert = + DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000; // Seek to the first shred with index >= start_index db_iterator.seek(&C::key((slot, start_index))); @@ -1004,7 +1010,15 @@ impl Blocktree { }; let upper_index = cmp::min(current_index, end_index); + // the tick that will be used to figure out the timeout for this hole + let reference_tick = u64::from(Shred::reference_tick_from_data( + &db_iterator.value().expect("couldn't read value"), + )); + if ticks_since_first_insert < reference_tick + MAX_TURBINE_PROPAGATION_DELAY_TICKS { + // The higher index holes have not timed out yet + break 'outer; + } for i in prev_index..upper_index { missing_indexes.push(i); if missing_indexes.len() == max_missing { @@ -1030,6 +1044,7 @@ impl Blocktree { pub fn find_missing_data_indexes( &self, slot: Slot, + first_timestamp: u64, start_index: u64, end_index: u64, max_missing: usize, @@ -1041,6 +1056,7 @@ impl Blocktree { Self::find_missing_indexes::( &mut db_iterator, slot, + first_timestamp, start_index, end_index, max_missing, @@ -1305,10 +1321,17 @@ fn update_slot_meta( slot_meta: &mut SlotMeta, index: u32, new_consumed: u64, + reference_tick: u8, ) { + let maybe_first_insert = slot_meta.received == 0; // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same shred. slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received); + if maybe_first_insert && slot_meta.received > 0 { + // predict the timestamp of what would have been the first shred in this slot + let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND; + slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed; + } slot_meta.consumed = new_consumed; slot_meta.last_index = { // If the last index in the slot hasn't been set before, then @@ -3160,27 +3183,27 @@ pub mod tests { // range of [0, gap) let expected: Vec = (1..gap).collect(); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap, gap as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap, gap as usize), expected ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 1, gap, (gap - 1) as usize), + blocktree.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize), expected, ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap - 1, (gap - 1) as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize), &expected[..expected.len() - 1], ); assert_eq!( - blocktree.find_missing_data_indexes(slot, gap - 2, gap, gap as usize), + blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize), vec![gap - 2, gap - 1], ); assert_eq!( - blocktree.find_missing_data_indexes(slot, gap - 2, gap, 1), + blocktree.find_missing_data_indexes(slot, 0, gap - 2, gap, 1), vec![gap - 2], ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap, 1), + blocktree.find_missing_data_indexes(slot, 0, 0, gap, 1), vec![1], ); @@ -3189,11 +3212,11 @@ pub mod tests { let mut expected: Vec = (1..gap).collect(); expected.push(gap + 1); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap + 2) as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize), expected, ); assert_eq!( - blocktree.find_missing_data_indexes(slot, 0, gap + 2, (gap - 1) as usize), + blocktree.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize), &expected[..expected.len() - 1], ); @@ -3209,6 +3232,7 @@ pub mod tests { assert_eq!( blocktree.find_missing_data_indexes( slot, + 0, j * gap, i * gap, ((i - j) * gap) as usize @@ -3222,6 +3246,34 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_find_missing_data_indexes_timeout() { + let slot = 0; + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + // Write entries + let gap: u64 = 10; + let shreds: Vec<_> = (0..64) + .map(|i| Shred::new_from_data(slot, (i * gap) as u32, 0, None, false, false, i as u8)) + .collect(); + blocktree.insert_shreds(shreds, None).unwrap(); + + let empty: Vec = vec![]; + assert_eq!( + blocktree.find_missing_data_indexes(slot, timestamp(), 0, 50, 1), + empty + ); + let expected: Vec<_> = (1..=9).collect(); + assert_eq!( + blocktree.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9), + expected + ); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + #[test] fn test_find_missing_data_indexes_sanity() { let slot = 0; @@ -3231,10 +3283,10 @@ pub mod tests { // Early exit conditions let empty: Vec = vec![]; - assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 5, 5, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty); - assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 0, 0, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 5, 5, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 4, 3, 1), empty); + assert_eq!(blocktree.find_missing_data_indexes(slot, 0, 1, 2, 0), empty); let entries = create_ticks(100, 0, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true); @@ -3258,7 +3310,7 @@ pub mod tests { // [i, first_index - 1] for start in 0..STARTS { let result = blocktree.find_missing_data_indexes( - slot, start, // start + slot, 0, start, // start END, //end MAX, //max ); @@ -3288,7 +3340,7 @@ pub mod tests { for i in 0..num_shreds as u64 { for j in 0..i { assert_eq!( - blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize), + blocktree.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize), empty ); } diff --git a/ledger/src/blocktree_meta.rs b/ledger/src/blocktree_meta.rs index cf8bb72ad..6ca6dea7b 100644 --- a/ledger/src/blocktree_meta.rs +++ b/ledger/src/blocktree_meta.rs @@ -13,11 +13,13 @@ pub struct SlotMeta { // The total number of consecutive blobs starting from index 0 // we have received for this slot. pub consumed: u64, - // The index *plus one* of the highest blob received for this slot. Useful - // for checking if the slot has received any blobs yet, and to calculate the + // The index *plus one* of the highest shred received for this slot. Useful + // for checking if the slot has received any shreds yet, and to calculate the // range where there is one or more holes: `(consumed..received)`. pub received: u64, - // The index of the blob that is flagged as the last blob for this slot. + // The timestamp of the first time a shred was added for this slot + pub first_shred_timestamp: u64, + // The index of the shred that is flagged as the last shred for this slot. pub last_index: u64, // The slot height of the block this one derives from. pub parent_slot: Slot, @@ -32,7 +34,7 @@ pub struct SlotMeta { } #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] -/// Index recording presence/absence of blobs +/// Index recording presence/absence of shreds pub struct Index { pub slot: Slot, data: DataIndex, @@ -41,14 +43,14 @@ pub struct Index { #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] pub struct DataIndex { - /// Map representing presence/absence of data blobs + /// Map representing presence/absence of data shreds index: BTreeSet, } #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] /// Erasure coding information pub struct CodingIndex { - /// Map from set index, to hashmap from blob index to presence bool + /// Map from set index, to hashmap from shred index to presence bool index: BTreeSet, } @@ -146,8 +148,8 @@ impl DataIndex { impl SlotMeta { pub fn is_full(&self) -> bool { // last_index is std::u64::MAX when it has no information about how - // many blobs will fill this slot. - // Note: A full slot with zero blobs is not possible. + // many shreds will fill this slot. + // Note: A full slot with zero shreds is not possible. if self.last_index == std::u64::MAX { return false; } @@ -180,6 +182,7 @@ impl SlotMeta { slot, consumed: 0, received: 0, + first_shred_timestamp: 0, parent_slot, next_slots: vec![], is_connected: slot == 0, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index ae2f9bcb1..642241fff 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -19,6 +19,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, KeypairUtil, Signature}, }; +use std::mem::size_of; use std::{sync::Arc, time::Instant}; /// The following constants are computed by hand, and hardcoded. @@ -347,6 +348,11 @@ impl Shred { } } + pub fn reference_tick_from_data(data: &[u8]) -> u8 { + let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::()]; + flags & SHRED_TICK_REFERENCE_MASK + } + pub fn verify(&self, pubkey: &Pubkey) -> bool { self.signature() .verify(pubkey.as_ref(), &self.payload[SIZE_OF_SIGNATURE..]) @@ -963,6 +969,7 @@ pub mod tests { let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), 5); + assert_eq!(Shred::reference_tick_from_data(&s.payload), 5); }); let deserialized_shred = @@ -992,6 +999,10 @@ pub mod tests { let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; data_shreds.iter().for_each(|s| { assert_eq!(s.reference_tick(), SHRED_TICK_REFERENCE_MASK); + assert_eq!( + Shred::reference_tick_from_data(&s.payload), + SHRED_TICK_REFERENCE_MASK + ); }); let deserialized_shred =