1208 lines
44 KiB
Rust
1208 lines
44 KiB
Rust
//! The `repair_service` module implements the tools necessary to generate a thread which
|
|
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds
|
|
use crate::{
|
|
cluster_info::ClusterInfo,
|
|
cluster_info_vote_listener::VerifiedVoteReceiver,
|
|
cluster_slots::ClusterSlots,
|
|
repair_weight::RepairWeight,
|
|
repair_weighted_traversal::Contains,
|
|
result::Result,
|
|
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
|
|
};
|
|
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
|
use solana_ledger::{
|
|
blockstore::{Blockstore, SlotMeta},
|
|
shred::Nonce,
|
|
};
|
|
use solana_measure::measure::Measure;
|
|
use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE};
|
|
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
|
|
use std::{
|
|
collections::{HashMap, HashSet},
|
|
iter::Iterator,
|
|
net::SocketAddr,
|
|
net::UdpSocket,
|
|
sync::atomic::{AtomicBool, Ordering},
|
|
sync::{Arc, RwLock},
|
|
thread::sleep,
|
|
thread::{self, Builder, JoinHandle},
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
|
|
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
|
|
|
|
#[derive(Default, Debug)]
|
|
pub struct SlotRepairs {
|
|
highest_shred_index: u64,
|
|
// map from pubkey to total number of requests
|
|
pubkey_repairs: HashMap<Pubkey, u64>,
|
|
}
|
|
|
|
#[derive(Default, Debug)]
|
|
pub struct RepairStatsGroup {
|
|
pub count: u64,
|
|
pub min: u64,
|
|
pub max: u64,
|
|
pub slot_pubkeys: HashMap<Slot, SlotRepairs>,
|
|
}
|
|
|
|
impl RepairStatsGroup {
|
|
pub fn update(&mut self, repair_peer_id: &Pubkey, slot: Slot, shred_index: u64) {
|
|
self.count += 1;
|
|
let slot_repairs = self.slot_pubkeys.entry(slot).or_default();
|
|
// Increment total number of repairs of this type for this pubkey by 1
|
|
*slot_repairs
|
|
.pubkey_repairs
|
|
.entry(*repair_peer_id)
|
|
.or_default() += 1;
|
|
// Update the max requested shred index for this slot
|
|
slot_repairs.highest_shred_index =
|
|
std::cmp::max(slot_repairs.highest_shred_index, shred_index);
|
|
self.min = std::cmp::min(self.min, slot);
|
|
self.max = std::cmp::max(self.max, slot);
|
|
}
|
|
}
|
|
|
|
#[derive(Default, Debug)]
|
|
pub struct RepairStats {
|
|
pub shred: RepairStatsGroup,
|
|
pub highest_shred: RepairStatsGroup,
|
|
pub orphan: RepairStatsGroup,
|
|
pub get_best_orphans_us: u64,
|
|
pub get_best_shreds_us: u64,
|
|
}
|
|
|
|
#[derive(Default, Debug)]
|
|
pub struct RepairTiming {
|
|
pub set_root_elapsed: u64,
|
|
pub get_votes_elapsed: u64,
|
|
pub add_votes_elapsed: u64,
|
|
pub get_best_orphans_elapsed: u64,
|
|
pub get_best_shreds_elapsed: u64,
|
|
pub send_repairs_elapsed: u64,
|
|
}
|
|
|
|
impl RepairTiming {
|
|
fn update(
|
|
&mut self,
|
|
set_root_elapsed: u64,
|
|
get_votes_elapsed: u64,
|
|
add_votes_elapsed: u64,
|
|
send_repairs_elapsed: u64,
|
|
) {
|
|
self.set_root_elapsed += set_root_elapsed;
|
|
self.get_votes_elapsed += get_votes_elapsed;
|
|
self.add_votes_elapsed += add_votes_elapsed;
|
|
self.send_repairs_elapsed += send_repairs_elapsed;
|
|
}
|
|
}
|
|
|
|
pub const MAX_REPAIR_LENGTH: usize = 512;
|
|
pub const MAX_REPAIR_PER_DUPLICATE: usize = 20;
|
|
pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000;
|
|
pub const REPAIR_MS: u64 = 100;
|
|
pub const MAX_ORPHANS: usize = 5;
|
|
|
|
pub struct RepairInfo {
|
|
pub bank_forks: Arc<RwLock<BankForks>>,
|
|
pub epoch_schedule: EpochSchedule,
|
|
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
|
|
pub repair_validators: Option<HashSet<Pubkey>>,
|
|
}
|
|
|
|
pub struct RepairSlotRange {
|
|
pub start: Slot,
|
|
pub end: Slot,
|
|
}
|
|
|
|
impl Default for RepairSlotRange {
|
|
fn default() -> Self {
|
|
RepairSlotRange {
|
|
start: 0,
|
|
end: std::u64::MAX,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default, Clone)]
|
|
pub struct DuplicateSlotRepairStatus {
|
|
start: u64,
|
|
repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>,
|
|
}
|
|
|
|
pub struct RepairService {
|
|
t_repair: JoinHandle<()>,
|
|
}
|
|
|
|
impl RepairService {
|
|
pub fn new(
|
|
blockstore: Arc<Blockstore>,
|
|
exit: Arc<AtomicBool>,
|
|
repair_socket: Arc<UdpSocket>,
|
|
cluster_info: Arc<ClusterInfo>,
|
|
repair_info: RepairInfo,
|
|
cluster_slots: Arc<ClusterSlots>,
|
|
verified_vote_receiver: VerifiedVoteReceiver,
|
|
) -> Self {
|
|
let t_repair = Builder::new()
|
|
.name("solana-repair-service".to_string())
|
|
.spawn(move || {
|
|
Self::run(
|
|
&blockstore,
|
|
&exit,
|
|
&repair_socket,
|
|
cluster_info,
|
|
repair_info,
|
|
&cluster_slots,
|
|
verified_vote_receiver,
|
|
)
|
|
})
|
|
.unwrap();
|
|
|
|
RepairService { t_repair }
|
|
}
|
|
|
|
fn run(
|
|
blockstore: &Blockstore,
|
|
exit: &AtomicBool,
|
|
repair_socket: &UdpSocket,
|
|
cluster_info: Arc<ClusterInfo>,
|
|
repair_info: RepairInfo,
|
|
cluster_slots: &ClusterSlots,
|
|
verified_vote_receiver: VerifiedVoteReceiver,
|
|
) {
|
|
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
|
|
let serve_repair = ServeRepair::new(cluster_info.clone());
|
|
let id = cluster_info.id();
|
|
let mut repair_stats = RepairStats::default();
|
|
let mut repair_timing = RepairTiming::default();
|
|
let mut last_stats = Instant::now();
|
|
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
|
|
HashMap::new();
|
|
|
|
loop {
|
|
if exit.load(Ordering::Relaxed) {
|
|
break;
|
|
}
|
|
|
|
let mut set_root_elapsed;
|
|
let mut get_votes_elapsed;
|
|
let mut add_votes_elapsed;
|
|
let repairs = {
|
|
let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone();
|
|
let new_root = root_bank.slot();
|
|
|
|
// Purge outdated slots from the weighting heuristic
|
|
set_root_elapsed = Measure::start("set_root_elapsed");
|
|
repair_weight.set_root(new_root);
|
|
set_root_elapsed.stop();
|
|
|
|
// Add new votes to the weighting heuristic
|
|
get_votes_elapsed = Measure::start("get_votes_elapsed");
|
|
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
|
|
verified_vote_receiver
|
|
.try_iter()
|
|
.for_each(|(vote_pubkey, vote_slots)| {
|
|
for slot in vote_slots {
|
|
slot_to_vote_pubkeys
|
|
.entry(slot)
|
|
.or_default()
|
|
.push(vote_pubkey);
|
|
}
|
|
});
|
|
get_votes_elapsed.stop();
|
|
|
|
add_votes_elapsed = Measure::start("add_votes");
|
|
repair_weight.add_votes(
|
|
&blockstore,
|
|
slot_to_vote_pubkeys.into_iter(),
|
|
root_bank.epoch_stakes_map(),
|
|
root_bank.epoch_schedule(),
|
|
);
|
|
add_votes_elapsed.stop();
|
|
/*let new_duplicate_slots = Self::find_new_duplicate_slots(
|
|
&duplicate_slot_repair_statuses,
|
|
blockstore,
|
|
cluster_slots,
|
|
&root_bank,
|
|
);
|
|
Self::process_new_duplicate_slots(
|
|
&new_duplicate_slots,
|
|
&mut duplicate_slot_repair_statuses,
|
|
cluster_slots,
|
|
&root_bank,
|
|
blockstore,
|
|
&serve_repair,
|
|
&repair_info.duplicate_slots_reset_sender,
|
|
&repair_info.repair_validators,
|
|
);
|
|
Self::generate_and_send_duplicate_repairs(
|
|
&mut duplicate_slot_repair_statuses,
|
|
cluster_slots,
|
|
blockstore,
|
|
&serve_repair,
|
|
&mut repair_stats,
|
|
&repair_socket,
|
|
&repair_info.repair_validators,
|
|
);*/
|
|
|
|
repair_weight.get_best_weighted_repairs(
|
|
blockstore,
|
|
root_bank.epoch_stakes_map(),
|
|
root_bank.epoch_schedule(),
|
|
MAX_ORPHANS,
|
|
MAX_REPAIR_LENGTH,
|
|
&duplicate_slot_repair_statuses,
|
|
Some(&mut repair_timing),
|
|
)
|
|
};
|
|
|
|
let mut cache = HashMap::new();
|
|
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
|
|
repairs.into_iter().for_each(|repair_request| {
|
|
if let Ok((to, req)) = serve_repair.repair_request(
|
|
&cluster_slots,
|
|
repair_request,
|
|
&mut cache,
|
|
&mut repair_stats,
|
|
&repair_info.repair_validators,
|
|
) {
|
|
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
|
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
|
0
|
|
});
|
|
}
|
|
});
|
|
send_repairs_elapsed.stop();
|
|
repair_timing.update(
|
|
set_root_elapsed.as_us(),
|
|
get_votes_elapsed.as_us(),
|
|
add_votes_elapsed.as_us(),
|
|
send_repairs_elapsed.as_us(),
|
|
);
|
|
|
|
if last_stats.elapsed().as_secs() > 2 {
|
|
let repair_total = repair_stats.shred.count
|
|
+ repair_stats.highest_shred.count
|
|
+ repair_stats.orphan.count;
|
|
info!("repair_stats: {:?}", repair_stats);
|
|
if repair_total > 0 {
|
|
datapoint_info!(
|
|
"serve_repair-repair",
|
|
("repair-total", repair_total, i64),
|
|
("shred-count", repair_stats.shred.count, i64),
|
|
("highest-shred-count", repair_stats.highest_shred.count, i64),
|
|
("orphan-count", repair_stats.orphan.count, i64),
|
|
("repair-highest-slot", repair_stats.highest_shred.max, i64),
|
|
("repair-orphan", repair_stats.orphan.max, i64),
|
|
);
|
|
}
|
|
datapoint_info!(
|
|
"serve_repair-repair-timing",
|
|
("set-root-elapsed", repair_timing.set_root_elapsed, i64),
|
|
("get-votes-elapsed", repair_timing.get_votes_elapsed, i64),
|
|
("add-votes-elapsed", repair_timing.add_votes_elapsed, i64),
|
|
(
|
|
"get-best-orphans-elapsed",
|
|
repair_timing.get_best_orphans_elapsed,
|
|
i64
|
|
),
|
|
(
|
|
"get-best-shreds-elapsed",
|
|
repair_timing.get_best_shreds_elapsed,
|
|
i64
|
|
),
|
|
(
|
|
"send-repairs-elapsed",
|
|
repair_timing.send_repairs_elapsed,
|
|
i64
|
|
),
|
|
);
|
|
repair_stats = RepairStats::default();
|
|
repair_timing = RepairTiming::default();
|
|
last_stats = Instant::now();
|
|
}
|
|
sleep(Duration::from_millis(REPAIR_MS));
|
|
}
|
|
}
|
|
|
|
// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end
|
|
pub fn generate_repairs_in_range(
|
|
blockstore: &Blockstore,
|
|
max_repairs: usize,
|
|
repair_range: &RepairSlotRange,
|
|
) -> Result<Vec<RepairType>> {
|
|
// Slot height and shred indexes for shreds we want to repair
|
|
let mut repairs: Vec<RepairType> = vec![];
|
|
for slot in repair_range.start..=repair_range.end {
|
|
if repairs.len() >= max_repairs {
|
|
break;
|
|
}
|
|
|
|
let meta = blockstore
|
|
.meta(slot)
|
|
.expect("Unable to lookup slot meta")
|
|
.unwrap_or(SlotMeta {
|
|
slot,
|
|
..SlotMeta::default()
|
|
});
|
|
|
|
let new_repairs = Self::generate_repairs_for_slot(
|
|
blockstore,
|
|
slot,
|
|
&meta,
|
|
max_repairs - repairs.len(),
|
|
);
|
|
repairs.extend(new_repairs);
|
|
}
|
|
|
|
Ok(repairs)
|
|
}
|
|
|
|
pub fn generate_repairs_for_slot(
|
|
blockstore: &Blockstore,
|
|
slot: Slot,
|
|
slot_meta: &SlotMeta,
|
|
max_repairs: usize,
|
|
) -> Vec<RepairType> {
|
|
if max_repairs == 0 || slot_meta.is_full() {
|
|
vec![]
|
|
} else if slot_meta.consumed == slot_meta.received {
|
|
vec![RepairType::HighestShred(slot, slot_meta.received)]
|
|
} else {
|
|
let reqs = blockstore.find_missing_data_indexes(
|
|
slot,
|
|
slot_meta.first_shred_timestamp,
|
|
slot_meta.consumed,
|
|
slot_meta.received,
|
|
max_repairs,
|
|
);
|
|
reqs.into_iter()
|
|
.map(|i| RepairType::Shred(slot, i))
|
|
.collect()
|
|
}
|
|
}
|
|
|
|
/// Repairs any fork starting at the input slot
|
|
pub fn generate_repairs_for_fork(
|
|
blockstore: &Blockstore,
|
|
repairs: &mut Vec<RepairType>,
|
|
max_repairs: usize,
|
|
slot: Slot,
|
|
duplicate_slot_repair_statuses: &dyn Contains<Slot>,
|
|
) {
|
|
let mut pending_slots = vec![slot];
|
|
while repairs.len() < max_repairs && !pending_slots.is_empty() {
|
|
let slot = pending_slots.pop().unwrap();
|
|
if duplicate_slot_repair_statuses.contains(&slot) {
|
|
// These are repaired through a different path
|
|
continue;
|
|
}
|
|
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
|
|
let new_repairs = Self::generate_repairs_for_slot(
|
|
blockstore,
|
|
slot,
|
|
&slot_meta,
|
|
max_repairs - repairs.len(),
|
|
);
|
|
repairs.extend(new_repairs);
|
|
let next_slots = slot_meta.next_slots;
|
|
pending_slots.extend(next_slots);
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn generate_duplicate_repairs_for_slot(
|
|
blockstore: &Blockstore,
|
|
slot: Slot,
|
|
) -> Option<Vec<RepairType>> {
|
|
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
|
|
if slot_meta.is_full() {
|
|
// If the slot is full, no further need to repair this slot
|
|
None
|
|
} else {
|
|
Some(Self::generate_repairs_for_slot(
|
|
blockstore,
|
|
slot,
|
|
&slot_meta,
|
|
MAX_REPAIR_PER_DUPLICATE,
|
|
))
|
|
}
|
|
} else {
|
|
error!("Slot meta for duplicate slot does not exist, cannot generate repairs");
|
|
// Filter out this slot from the set of duplicates to be repaired as
|
|
// the SlotMeta has to exist for duplicates to be generated
|
|
None
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn generate_and_send_duplicate_repairs(
|
|
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
|
|
cluster_slots: &ClusterSlots,
|
|
blockstore: &Blockstore,
|
|
serve_repair: &ServeRepair,
|
|
repair_stats: &mut RepairStats,
|
|
repair_socket: &UdpSocket,
|
|
repair_validators: &Option<HashSet<Pubkey>>,
|
|
) {
|
|
duplicate_slot_repair_statuses.retain(|slot, status| {
|
|
Self::update_duplicate_slot_repair_addr(
|
|
*slot,
|
|
status,
|
|
cluster_slots,
|
|
serve_repair,
|
|
repair_validators,
|
|
);
|
|
if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr {
|
|
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);
|
|
|
|
if let Some(repairs) = repairs {
|
|
for repair_type in repairs {
|
|
if let Err(e) = Self::serialize_and_send_request(
|
|
&repair_type,
|
|
repair_socket,
|
|
&repair_pubkey,
|
|
&repair_addr,
|
|
serve_repair,
|
|
repair_stats,
|
|
DEFAULT_NONCE,
|
|
) {
|
|
info!(
|
|
"repair req send_to {} ({}) error {:?}",
|
|
repair_pubkey, repair_addr, e
|
|
);
|
|
}
|
|
}
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
} else {
|
|
true
|
|
}
|
|
})
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn serialize_and_send_request(
|
|
repair_type: &RepairType,
|
|
repair_socket: &UdpSocket,
|
|
repair_pubkey: &Pubkey,
|
|
to: &SocketAddr,
|
|
serve_repair: &ServeRepair,
|
|
repair_stats: &mut RepairStats,
|
|
nonce: Nonce,
|
|
) -> Result<()> {
|
|
let req =
|
|
serve_repair.map_repair_request(&repair_type, repair_pubkey, repair_stats, nonce)?;
|
|
repair_socket.send_to(&req, to)?;
|
|
Ok(())
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn update_duplicate_slot_repair_addr(
|
|
slot: Slot,
|
|
status: &mut DuplicateSlotRepairStatus,
|
|
cluster_slots: &ClusterSlots,
|
|
serve_repair: &ServeRepair,
|
|
repair_validators: &Option<HashSet<Pubkey>>,
|
|
) {
|
|
let now = timestamp();
|
|
if status.repair_pubkey_and_addr.is_none()
|
|
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
|
|
{
|
|
let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer(
|
|
slot,
|
|
cluster_slots,
|
|
repair_validators,
|
|
);
|
|
status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok();
|
|
status.start = timestamp();
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn process_new_duplicate_slots(
|
|
new_duplicate_slots: &[Slot],
|
|
duplicate_slot_repair_statuses: &mut HashMap<Slot, DuplicateSlotRepairStatus>,
|
|
cluster_slots: &ClusterSlots,
|
|
root_bank: &Bank,
|
|
blockstore: &Blockstore,
|
|
serve_repair: &ServeRepair,
|
|
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
|
|
repair_validators: &Option<HashSet<Pubkey>>,
|
|
) {
|
|
for slot in new_duplicate_slots {
|
|
warn!(
|
|
"Cluster completed slot: {}, dumping our current version and repairing",
|
|
slot
|
|
);
|
|
// Clear the slot signatures from status cache for this slot
|
|
root_bank.clear_slot_signatures(*slot);
|
|
|
|
// Clear the accounts for this slot
|
|
root_bank.remove_unrooted_slot(*slot);
|
|
|
|
// Clear the slot-related data in blockstore. This will:
|
|
// 1) Clear old shreds allowing new ones to be inserted
|
|
// 2) Clear the "dead" flag allowing ReplayStage to start replaying
|
|
// this slot
|
|
blockstore.clear_unconfirmed_slot(*slot);
|
|
|
|
// Signal ReplayStage to clear its progress map so that a different
|
|
// version of this slot can be replayed
|
|
let _ = duplicate_slots_reset_sender.send(*slot);
|
|
|
|
// Mark this slot as special repair, try to download from single
|
|
// validator to avoid corruption
|
|
let repair_pubkey_and_addr = serve_repair
|
|
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots, repair_validators)
|
|
.ok();
|
|
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
|
|
start: timestamp(),
|
|
repair_pubkey_and_addr,
|
|
};
|
|
duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status);
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn find_new_duplicate_slots(
|
|
duplicate_slot_repair_statuses: &HashMap<Slot, DuplicateSlotRepairStatus>,
|
|
blockstore: &Blockstore,
|
|
cluster_slots: &ClusterSlots,
|
|
root_bank: &Bank,
|
|
) -> Vec<Slot> {
|
|
let dead_slots_iter = blockstore
|
|
.dead_slots_iterator(root_bank.slot() + 1)
|
|
.expect("Couldn't get dead slots iterator from blockstore");
|
|
dead_slots_iter
|
|
.filter_map(|dead_slot| {
|
|
if let Some(status) = duplicate_slot_repair_statuses.get(&dead_slot) {
|
|
// Newly repaired version of this slot has been marked dead again,
|
|
// time to purge again
|
|
warn!(
|
|
"Repaired version of slot {} most recently (but maybe not entirely)
|
|
from {:?} has failed again",
|
|
dead_slot, status.repair_pubkey_and_addr
|
|
);
|
|
}
|
|
cluster_slots
|
|
.lookup(dead_slot)
|
|
.and_then(|completed_dead_slot_pubkeys| {
|
|
let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0;
|
|
if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) {
|
|
let total_stake = epoch_stakes.total_stake();
|
|
let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts();
|
|
let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys
|
|
.read()
|
|
.unwrap()
|
|
.iter()
|
|
.map(|(node_key, _)| {
|
|
node_id_to_vote_accounts
|
|
.get(node_key)
|
|
.map(|v| v.total_stake)
|
|
.unwrap_or(0)
|
|
})
|
|
.sum();
|
|
if total_completed_slot_stake as f64 / total_stake as f64
|
|
> VOTE_THRESHOLD_SIZE
|
|
{
|
|
Some(dead_slot)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
error!(
|
|
"Dead slot {} is too far ahead of root bank {}",
|
|
dead_slot,
|
|
root_bank.slot()
|
|
);
|
|
None
|
|
}
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn join(self) -> thread::Result<()> {
|
|
self.t_repair.join()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::*;
|
|
use crate::cluster_info::Node;
|
|
use crossbeam_channel::unbounded;
|
|
use solana_ledger::blockstore::{
|
|
make_chaining_slot_entries, make_many_slot_entries, make_slot_entries,
|
|
};
|
|
use solana_ledger::shred::max_ticks_per_n_shreds;
|
|
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
|
|
use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs};
|
|
use solana_sdk::signature::Signer;
|
|
use solana_vote_program::vote_transaction;
|
|
use std::collections::HashSet;
|
|
|
|
#[test]
|
|
pub fn test_repair_orphan() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
{
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
|
|
// Create some orphan slots
|
|
let (mut shreds, _) = make_slot_entries(1, 0, 1);
|
|
let (shreds2, _) = make_slot_entries(5, 2, 1);
|
|
shreds.extend(shreds2);
|
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
|
let mut repair_weight = RepairWeight::new(0);
|
|
assert_eq!(
|
|
repair_weight.get_best_weighted_repairs(
|
|
&blockstore,
|
|
&HashMap::new(),
|
|
&EpochSchedule::default(),
|
|
MAX_ORPHANS,
|
|
MAX_REPAIR_LENGTH,
|
|
&HashSet::default(),
|
|
None
|
|
),
|
|
vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)]
|
|
);
|
|
}
|
|
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_repair_empty_slot() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
{
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
|
|
let (shreds, _) = make_slot_entries(2, 0, 1);
|
|
|
|
// Write this shred to slot 2, should chain to slot 0, which we haven't received
|
|
// any shreds for
|
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
|
let mut repair_weight = RepairWeight::new(0);
|
|
|
|
// Check that repair tries to patch the empty slot
|
|
assert_eq!(
|
|
repair_weight.get_best_weighted_repairs(
|
|
&blockstore,
|
|
&HashMap::new(),
|
|
&EpochSchedule::default(),
|
|
MAX_ORPHANS,
|
|
MAX_REPAIR_LENGTH,
|
|
&HashSet::default(),
|
|
None
|
|
),
|
|
vec![RepairType::HighestShred(0, 0)]
|
|
);
|
|
}
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_generate_repairs() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
{
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
|
|
let nth = 3;
|
|
let num_slots = 2;
|
|
|
|
// Create some shreds
|
|
let (mut shreds, _) = make_many_slot_entries(0, num_slots as u64, 150 as u64);
|
|
let num_shreds = shreds.len() as u64;
|
|
let num_shreds_per_slot = num_shreds / num_slots;
|
|
|
|
// write every nth shred
|
|
let mut shreds_to_write = vec![];
|
|
let mut missing_indexes_per_slot = vec![];
|
|
for i in (0..num_shreds).rev() {
|
|
let index = i % num_shreds_per_slot;
|
|
if index % nth == 0 {
|
|
shreds_to_write.insert(0, shreds.remove(i as usize));
|
|
} else if i < num_shreds_per_slot {
|
|
missing_indexes_per_slot.insert(0, index);
|
|
}
|
|
}
|
|
blockstore
|
|
.insert_shreds(shreds_to_write, None, false)
|
|
.unwrap();
|
|
// sleep so that the holes are ready for repair
|
|
sleep(Duration::from_secs(1));
|
|
let expected: Vec<RepairType> = (0..num_slots)
|
|
.flat_map(|slot| {
|
|
missing_indexes_per_slot
|
|
.iter()
|
|
.map(move |shred_index| RepairType::Shred(slot as u64, *shred_index))
|
|
})
|
|
.collect();
|
|
|
|
let mut repair_weight = RepairWeight::new(0);
|
|
assert_eq!(
|
|
repair_weight.get_best_weighted_repairs(
|
|
&blockstore,
|
|
&HashMap::new(),
|
|
&EpochSchedule::default(),
|
|
MAX_ORPHANS,
|
|
MAX_REPAIR_LENGTH,
|
|
&HashSet::default(),
|
|
None
|
|
),
|
|
expected
|
|
);
|
|
|
|
assert_eq!(
|
|
repair_weight.get_best_weighted_repairs(
|
|
&blockstore,
|
|
&HashMap::new(),
|
|
&EpochSchedule::default(),
|
|
MAX_ORPHANS,
|
|
expected.len() - 2,
|
|
&HashSet::default(),
|
|
None
|
|
)[..],
|
|
expected[0..expected.len() - 2]
|
|
);
|
|
}
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_generate_highest_repair() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
{
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
|
|
let num_entries_per_slot = 100;
|
|
|
|
// Create some shreds
|
|
let (mut shreds, _) = make_slot_entries(0, 0, num_entries_per_slot as u64);
|
|
let num_shreds_per_slot = shreds.len() as u64;
|
|
|
|
// Remove last shred (which is also last in slot) so that slot is not complete
|
|
shreds.pop();
|
|
|
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
|
|
|
// We didn't get the last shred for this slot, so ask for the highest shred for that slot
|
|
let expected: Vec<RepairType> =
|
|
vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)];
|
|
|
|
let mut repair_weight = RepairWeight::new(0);
|
|
assert_eq!(
|
|
repair_weight.get_best_weighted_repairs(
|
|
&blockstore,
|
|
&HashMap::new(),
|
|
&EpochSchedule::default(),
|
|
MAX_ORPHANS,
|
|
MAX_REPAIR_LENGTH,
|
|
&HashSet::default(),
|
|
None
|
|
),
|
|
expected
|
|
);
|
|
}
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_repair_range() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
{
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
|
|
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
|
|
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
|
|
|
|
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
|
|
for (mut slot_shreds, _) in shreds.into_iter() {
|
|
slot_shreds.remove(0);
|
|
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
|
|
}
|
|
// sleep to make slot eligible for repair
|
|
sleep(Duration::from_secs(1));
|
|
// Iterate through all possible combinations of start..end (inclusive on both
|
|
// sides of the range)
|
|
for start in 0..slots.len() {
|
|
for end in start..slots.len() {
|
|
let mut repair_slot_range = RepairSlotRange::default();
|
|
repair_slot_range.start = slots[start];
|
|
repair_slot_range.end = slots[end];
|
|
let expected: Vec<RepairType> = (repair_slot_range.start
|
|
..=repair_slot_range.end)
|
|
.map(|slot_index| {
|
|
if slots.contains(&(slot_index as u64)) {
|
|
RepairType::Shred(slot_index as u64, 0)
|
|
} else {
|
|
RepairType::HighestShred(slot_index as u64, 0)
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
assert_eq!(
|
|
RepairService::generate_repairs_in_range(
|
|
&blockstore,
|
|
std::usize::MAX,
|
|
&repair_slot_range,
|
|
)
|
|
.unwrap(),
|
|
expected
|
|
);
|
|
}
|
|
}
|
|
}
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_repair_range_highest() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
{
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
|
|
let num_entries_per_slot = 10;
|
|
|
|
let num_slots = 1;
|
|
let start = 5;
|
|
|
|
// Create some shreds in slots 0..num_slots
|
|
for i in start..start + num_slots {
|
|
let parent = if i > 0 { i - 1 } else { 0 };
|
|
let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);
|
|
|
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
|
}
|
|
|
|
let end = 4;
|
|
let expected: Vec<RepairType> = vec![
|
|
RepairType::HighestShred(end - 2, 0),
|
|
RepairType::HighestShred(end - 1, 0),
|
|
RepairType::HighestShred(end, 0),
|
|
];
|
|
|
|
let mut repair_slot_range = RepairSlotRange::default();
|
|
repair_slot_range.start = 2;
|
|
repair_slot_range.end = end;
|
|
|
|
assert_eq!(
|
|
RepairService::generate_repairs_in_range(
|
|
&blockstore,
|
|
std::usize::MAX,
|
|
&repair_slot_range,
|
|
)
|
|
.unwrap(),
|
|
expected
|
|
);
|
|
}
|
|
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_generate_duplicate_repairs_for_slot() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
let dead_slot = 9;
|
|
|
|
// SlotMeta doesn't exist, should make no repairs
|
|
assert!(
|
|
RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none()
|
|
);
|
|
|
|
// Insert some shreds to create a SlotMeta, should make repairs
|
|
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
|
|
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
|
|
blockstore
|
|
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
|
|
.unwrap();
|
|
assert!(
|
|
RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_some()
|
|
);
|
|
|
|
// SlotMeta is full, should make no repairs
|
|
blockstore
|
|
.insert_shreds(vec![shreds.pop().unwrap()], None, false)
|
|
.unwrap();
|
|
assert!(
|
|
RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none()
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_generate_and_send_duplicate_repairs() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
let cluster_slots = ClusterSlots::default();
|
|
let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info);
|
|
let mut duplicate_slot_repair_statuses = HashMap::new();
|
|
let dead_slot = 9;
|
|
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
|
|
let duplicate_status = DuplicateSlotRepairStatus {
|
|
start: std::u64::MAX,
|
|
repair_pubkey_and_addr: None,
|
|
};
|
|
|
|
// Insert some shreds to create a SlotMeta,
|
|
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
|
|
let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot);
|
|
blockstore
|
|
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
|
|
.unwrap();
|
|
|
|
duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status);
|
|
|
|
// There is no repair_addr, so should not get filtered because the timeout
|
|
// `std::u64::MAX` has not expired
|
|
RepairService::generate_and_send_duplicate_repairs(
|
|
&mut duplicate_slot_repair_statuses,
|
|
&cluster_slots,
|
|
&blockstore,
|
|
&serve_repair,
|
|
&mut RepairStats::default(),
|
|
&UdpSocket::bind("0.0.0.0:0").unwrap(),
|
|
&None,
|
|
);
|
|
assert!(duplicate_slot_repair_statuses
|
|
.get(&dead_slot)
|
|
.unwrap()
|
|
.repair_pubkey_and_addr
|
|
.is_none());
|
|
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
|
|
|
|
// Give the slot a repair address
|
|
duplicate_slot_repair_statuses
|
|
.get_mut(&dead_slot)
|
|
.unwrap()
|
|
.repair_pubkey_and_addr =
|
|
Some((Pubkey::default(), receive_socket.local_addr().unwrap()));
|
|
|
|
// Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses`
|
|
RepairService::generate_and_send_duplicate_repairs(
|
|
&mut duplicate_slot_repair_statuses,
|
|
&cluster_slots,
|
|
&blockstore,
|
|
&serve_repair,
|
|
&mut RepairStats::default(),
|
|
&UdpSocket::bind("0.0.0.0:0").unwrap(),
|
|
&None,
|
|
);
|
|
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
|
|
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
|
|
|
|
// Insert rest of shreds. Slot is full, should get filtered from
|
|
// `duplicate_slot_repair_statuses`
|
|
blockstore
|
|
.insert_shreds(vec![shreds.pop().unwrap()], None, false)
|
|
.unwrap();
|
|
RepairService::generate_and_send_duplicate_repairs(
|
|
&mut duplicate_slot_repair_statuses,
|
|
&cluster_slots,
|
|
&blockstore,
|
|
&serve_repair,
|
|
&mut RepairStats::default(),
|
|
&UdpSocket::bind("0.0.0.0:0").unwrap(),
|
|
&None,
|
|
);
|
|
assert!(duplicate_slot_repair_statuses.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_update_duplicate_slot_repair_addr() {
|
|
let dummy_addr = Some((
|
|
Pubkey::default(),
|
|
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
|
|
));
|
|
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
|
|
Node::new_localhost().info,
|
|
));
|
|
let serve_repair = ServeRepair::new(cluster_info.clone());
|
|
let valid_repair_peer = Node::new_localhost().info;
|
|
|
|
// Signal that this peer has completed the dead slot, and is thus
|
|
// a valid target for repair
|
|
let dead_slot = 9;
|
|
let cluster_slots = ClusterSlots::default();
|
|
cluster_slots.insert_node_id(dead_slot, Arc::new(valid_repair_peer.id));
|
|
cluster_info.insert_info(valid_repair_peer);
|
|
|
|
// Not enough time has passed, should not update the
|
|
// address
|
|
let mut duplicate_status = DuplicateSlotRepairStatus {
|
|
start: std::u64::MAX,
|
|
repair_pubkey_and_addr: dummy_addr,
|
|
};
|
|
RepairService::update_duplicate_slot_repair_addr(
|
|
dead_slot,
|
|
&mut duplicate_status,
|
|
&cluster_slots,
|
|
&serve_repair,
|
|
&None,
|
|
);
|
|
assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
|
|
|
|
// If the repair address is None, should try to update
|
|
let mut duplicate_status = DuplicateSlotRepairStatus {
|
|
start: std::u64::MAX,
|
|
repair_pubkey_and_addr: None,
|
|
};
|
|
RepairService::update_duplicate_slot_repair_addr(
|
|
dead_slot,
|
|
&mut duplicate_status,
|
|
&cluster_slots,
|
|
&serve_repair,
|
|
&None,
|
|
);
|
|
assert!(duplicate_status.repair_pubkey_and_addr.is_some());
|
|
|
|
// If sufficient time has passed, should try to update
|
|
let mut duplicate_status = DuplicateSlotRepairStatus {
|
|
start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
|
|
repair_pubkey_and_addr: dummy_addr,
|
|
};
|
|
RepairService::update_duplicate_slot_repair_addr(
|
|
dead_slot,
|
|
&mut duplicate_status,
|
|
&cluster_slots,
|
|
&serve_repair,
|
|
&None,
|
|
);
|
|
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_process_new_duplicate_slots() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
let cluster_slots = ClusterSlots::default();
|
|
let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info);
|
|
let mut duplicate_slot_repair_statuses = HashMap::new();
|
|
let duplicate_slot = 9;
|
|
|
|
// Fill blockstore for dead slot
|
|
blockstore.set_dead_slot(duplicate_slot).unwrap();
|
|
assert!(blockstore.is_dead(duplicate_slot));
|
|
let (shreds, _) = make_slot_entries(duplicate_slot, 0, 1);
|
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
|
|
|
let keypairs = ValidatorVoteKeypairs::new_rand();
|
|
let (reset_sender, reset_receiver) = unbounded();
|
|
let GenesisConfigInfo {
|
|
genesis_config,
|
|
mint_keypair,
|
|
..
|
|
} = genesis_utils::create_genesis_config_with_vote_accounts(
|
|
1_000_000_000,
|
|
&[&keypairs],
|
|
vec![10000],
|
|
);
|
|
let bank0 = Arc::new(Bank::new(&genesis_config));
|
|
let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot);
|
|
let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey());
|
|
bank9
|
|
.transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey())
|
|
.unwrap();
|
|
let vote_tx = vote_transaction::new_vote_transaction(
|
|
vec![0],
|
|
bank0.hash(),
|
|
bank0.last_blockhash(),
|
|
&keypairs.node_keypair,
|
|
&keypairs.vote_keypair,
|
|
&keypairs.vote_keypair,
|
|
None,
|
|
);
|
|
bank9.process_transaction(&vote_tx).unwrap();
|
|
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some());
|
|
|
|
RepairService::process_new_duplicate_slots(
|
|
&[duplicate_slot],
|
|
&mut duplicate_slot_repair_statuses,
|
|
&cluster_slots,
|
|
&bank9,
|
|
&blockstore,
|
|
&serve_repair,
|
|
&reset_sender,
|
|
&None,
|
|
);
|
|
|
|
// Blockstore should have been cleared
|
|
assert!(!blockstore.is_dead(duplicate_slot));
|
|
|
|
// Should not be able to find signature for slot 9 for the tx
|
|
assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_none());
|
|
|
|
// Getting balance should return the old balance (accounts were cleared)
|
|
assert_eq!(
|
|
bank9.get_balance(&keypairs.node_keypair.pubkey()),
|
|
old_balance
|
|
);
|
|
|
|
// Should add the duplicate slot to the tracker
|
|
assert!(duplicate_slot_repair_statuses
|
|
.get(&duplicate_slot)
|
|
.is_some());
|
|
|
|
// A signal should be sent to clear ReplayStage
|
|
assert!(reset_receiver.try_recv().is_ok());
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_find_new_duplicate_slots() {
|
|
let blockstore_path = get_tmp_ledger_path!();
|
|
let blockstore = Blockstore::open(&blockstore_path).unwrap();
|
|
let cluster_slots = ClusterSlots::default();
|
|
let duplicate_slot_repair_statuses = HashMap::new();
|
|
let keypairs = ValidatorVoteKeypairs::new_rand();
|
|
let only_node_id = Arc::new(keypairs.node_keypair.pubkey());
|
|
let GenesisConfigInfo { genesis_config, .. } =
|
|
genesis_utils::create_genesis_config_with_vote_accounts(
|
|
1_000_000_000,
|
|
&[keypairs],
|
|
vec![100],
|
|
);
|
|
let bank0 = Bank::new(&genesis_config);
|
|
|
|
// Empty blockstore should have no duplicates
|
|
assert!(RepairService::find_new_duplicate_slots(
|
|
&duplicate_slot_repair_statuses,
|
|
&blockstore,
|
|
&cluster_slots,
|
|
&bank0,
|
|
)
|
|
.is_empty());
|
|
|
|
// Insert a dead slot, but is not confirmed by network so should not
|
|
// be marked as duplicate
|
|
let dead_slot = 9;
|
|
blockstore.set_dead_slot(dead_slot).unwrap();
|
|
assert!(RepairService::find_new_duplicate_slots(
|
|
&duplicate_slot_repair_statuses,
|
|
&blockstore,
|
|
&cluster_slots,
|
|
&bank0,
|
|
)
|
|
.is_empty());
|
|
|
|
// If supermajority confirms the slot, then dead slot should be
|
|
// marked as a duplicate that needs to be repaired
|
|
cluster_slots.insert_node_id(dead_slot, only_node_id);
|
|
assert_eq!(
|
|
RepairService::find_new_duplicate_slots(
|
|
&duplicate_slot_repair_statuses,
|
|
&blockstore,
|
|
&cluster_slots,
|
|
&bank0,
|
|
),
|
|
vec![dead_slot]
|
|
);
|
|
}
|
|
}
|