Introduce AncestorHashesService (#18812)

This commit is contained in:
carllin 2021-07-23 16:54:47 -07:00 committed by GitHub
parent f4aa5c5d8d
commit 1ee64afb12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 499 additions and 73 deletions

1
Cargo.lock generated
View File

@ -4556,6 +4556,7 @@ dependencies = [
"byteorder",
"chrono",
"crossbeam-channel",
"dashmap",
"ed25519-dalek",
"flate2",
"fs_extra",

View File

@ -23,6 +23,7 @@ bs58 = "0.4.0"
byteorder = "1.4.3"
chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-channel = "0.5"
dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] }
ed25519-dalek = "=1.0.1"
fs_extra = "1.2.0"
flate2 = "1.0"

View File

@ -0,0 +1,364 @@
use crate::{
duplicate_repair_status::DeadSlotAncestorRequestStatus,
outstanding_requests::OutstandingRequests,
repair_response::{self},
repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup},
result::{Error, Result},
serve_repair::AncestorHashesRepairType,
};
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE};
use solana_measure::measure::Measure;
use solana_perf::{packet::limited_deserialize, recycler::Recycler};
use solana_sdk::{
clock::{Slot, SLOT_MS},
timing::timestamp,
};
use solana_streamer::{
packet::Packets,
streamer::{self, PacketReceiver},
};
use std::{
net::UdpSocket,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
{Arc, RwLock},
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2;
type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>;
#[derive(Default)]
pub struct AncestorHashesResponsesStats {
pub total_packets: usize,
pub dropped_packets: usize,
pub invalid_packets: usize,
pub processed: usize,
}
impl AncestorHashesResponsesStats {
fn report(&mut self) {
inc_new_counter_info!(
"ancestor_hashes_responses-total_packets",
self.total_packets
);
inc_new_counter_info!("ancestor_hashes_responses-processed", self.processed);
inc_new_counter_info!(
"ancestor_hashes_responses-dropped_packets",
self.dropped_packets
);
inc_new_counter_info!(
"ancestor_hashes_responses-invalid_packets",
self.invalid_packets
);
*self = AncestorHashesResponsesStats::default();
}
}
pub struct AncestorRepairRequestsStats {
pub ancestor_requests: RepairStatsGroup,
last_report: Instant,
}
impl Default for AncestorRepairRequestsStats {
fn default() -> Self {
AncestorRepairRequestsStats {
ancestor_requests: RepairStatsGroup::default(),
last_report: Instant::now(),
}
}
}
impl AncestorRepairRequestsStats {
fn report(&mut self) {
let slot_to_count: Vec<_> = self
.ancestor_requests
.slot_pubkeys
.iter()
.map(|(slot, slot_repairs)| {
(
slot,
slot_repairs
.pubkey_repairs()
.iter()
.map(|(_key, count)| count)
.sum::<u64>(),
)
})
.collect();
let repair_total = self.ancestor_requests.count;
if self.last_report.elapsed().as_secs() > 2 && repair_total > 0 {
info!("ancestor_repair_requests_stats: {:?}", slot_to_count);
datapoint_info!(
"ancestor-repair",
("ancestor-repair-count", self.ancestor_requests.count, i64)
);
*self = AncestorRepairRequestsStats::default();
}
}
}
pub struct AncestorHashesService {
thread_hdls: Vec<JoinHandle<()>>,
}
impl AncestorHashesService {
pub fn new(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
ancestor_hashes_request_socket: Arc<UdpSocket>,
repair_info: RepairInfo,
) -> Self {
let outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>> =
Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default()));
let (response_sender, response_receiver) = channel();
let t_receiver = streamer::receiver(
ancestor_hashes_request_socket.clone(),
&exit,
response_sender,
Recycler::default(),
"ancestor_hashes_response_receiver",
1,
false,
);
// Listen for responses to our ancestor requests
let ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>> =
Arc::new(DashMap::new());
let t_ancestor_hashes_responses = Self::run_responses_listener(
ancestor_hashes_request_statuses.clone(),
response_receiver,
blockstore,
outstanding_requests.clone(),
exit.clone(),
repair_info.duplicate_slots_reset_sender.clone(),
);
// Generate ancestor requests for dead slots that are repairable
let t_ancestor_requests = Self::run_find_repairable_dead_slots(
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
repair_info,
outstanding_requests,
exit,
);
let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests];
Self { thread_hdls }
}
pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
/// Listen for responses to our ancestors hashes repair requests
fn run_responses_listener(
ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>>,
response_receiver: PacketReceiver,
blockstore: Arc<Blockstore>,
outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
exit: Arc<AtomicBool>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
) -> JoinHandle<()> {
Builder::new()
.name("solana-ancestor-hashes-responses-service".to_string())
.spawn(move || {
let mut last_stats_report = Instant::now();
let mut stats = AncestorHashesResponsesStats::default();
let mut max_packets = 1024;
loop {
let result = Self::process_new_responses(
&ancestor_hashes_request_statuses,
&response_receiver,
&blockstore,
&outstanding_requests,
&mut stats,
&mut max_packets,
&duplicate_slots_reset_sender,
);
match result {
Err(Error::RecvTimeout(_)) | Ok(_) => {}
Err(err) => info!("ancestors hashes reponses listener error: {:?}", err),
};
if exit.load(Ordering::Relaxed) {
return;
}
if last_stats_report.elapsed().as_secs() > 2 {
stats.report();
last_stats_report = Instant::now();
}
}
})
.unwrap()
}
/// Process messages from the network
fn process_new_responses(
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
response_receiver: &PacketReceiver,
blockstore: &Blockstore,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
stats: &mut AncestorHashesResponsesStats,
max_packets: &mut usize,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let mut responses = vec![response_receiver.recv_timeout(timeout)?];
let mut total_packets = responses[0].packets.len();
let mut dropped_packets = 0;
while let Ok(more) = response_receiver.try_recv() {
total_packets += more.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of DOS
responses.push(more);
} else {
dropped_packets += more.packets.len();
}
}
stats.dropped_packets += dropped_packets;
stats.total_packets += total_packets;
let mut time = Measure::start("ancestor_hashes::handle_packets");
for response in responses {
Self::handle_packets(
ancestor_hashes_request_statuses,
response,
stats,
outstanding_requests,
blockstore,
duplicate_slots_reset_sender,
);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
Ok(())
}
fn handle_packets(
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
packets: Packets,
stats: &mut AncestorHashesResponsesStats,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
blockstore: &Blockstore,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
) {
// iter over the packets
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
if let Ok(ancestor_hashes_response) =
limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE])
{
// Verify the response
let request_slot = repair_response::nonce(&packet.data[..packet.meta.size])
.and_then(|nonce| {
outstanding_requests.write().unwrap().register_response(
nonce,
&ancestor_hashes_response,
timestamp(),
// If the response is valid, return the slot the request
// was for
|ancestor_hashes_request| ancestor_hashes_request.0,
)
});
if request_slot.is_none() {
stats.invalid_packets += 1;
return;
}
// If was a valid response, there must be a valid `request_slot`
let request_slot = request_slot.unwrap();
stats.processed += 1;
// Check if we can make any decisions.
if let Occupied(mut ancestor_hashes_status_ref) =
ancestor_hashes_request_statuses.entry(request_slot)
{
if let Some(decision) = ancestor_hashes_status_ref.get_mut().add_response(
&from_addr,
ancestor_hashes_response.into_slot_hashes(),
blockstore,
) {
let potential_slots_to_dump = {
// TODO: In the case of DuplicateAncestorDecision::ContinueSearch
// This means all the ancestors were mismatched, which
// means the earliest mismatched ancestor has yet to be found.
//
// In the best case scenario, this means after ReplayStage dumps
// the earliest known ancestor `A` here, and then repairs `A`,
// because we may still have the incorrect version of some ancestor
// of `A`, we will mark `A` as dead and then continue the search
// protocol through another round of ancestor repairs.
//
// However this process is a bit slow, so in an ideal world, the
// protocol could be extended to keep searching by making
// another ancestor repair request from the earliest returned
// ancestor from this search.
decision
.repair_status()
.map(|status| status.correct_ancestors_to_repair.clone())
};
let mut did_send_replay_correct_ancestors = false;
if let Some(potential_slots_to_dump) = potential_slots_to_dump {
// Signal ReplayStage to dump the fork that is descended from
// `earliest_mismatched_slot_to_dump`.
if !potential_slots_to_dump.is_empty() {
did_send_replay_correct_ancestors = true;
let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump);
}
}
if !did_send_replay_correct_ancestors {
// If nothing is going to be dumped + repaired, then we can remove
// this slot from `ancestor_hashes_request_statuses` since the
// dead flag won't be cleared from blockstore, so the
// `ancestor_hashes_request_statuses.retain()` in
// `Self::run_find_repairable_dead_slots()` won't clear
// this slot
ancestor_hashes_status_ref.remove();
}
}
}
}
});
}
fn run_find_repairable_dead_slots(
_ancestor_hashes_request_statuses: Arc<DashMap<Slot, DeadSlotAncestorRequestStatus>>,
_ancestor_hashes_request_socket: Arc<UdpSocket>,
_repair_info: RepairInfo,
_outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut repair_stats = AncestorRepairRequestsStats::default();
Builder::new()
.name("solana-find-repairable-dead-slots".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
}
repair_stats.report();
sleep(Duration::from_millis(SLOT_MS));
})
.unwrap()
}
}

View File

@ -8,6 +8,7 @@
//!
pub mod accounts_hash_verifier;
pub mod ancestor_hashes_service;
pub mod banking_stage;
pub mod broadcast_stage;
pub mod cache_block_meta_service;

View File

@ -29,8 +29,15 @@ where
nonce
}
pub fn register_response(&mut self, nonce: u32, response: &S, now: u64) -> bool {
let (is_valid, should_delete) = self
pub fn register_response<R>(
&mut self,
nonce: u32,
response: &S,
now: u64,
// runs if the response was valid
success_fn: impl Fn(&T) -> R,
) -> Option<R> {
let (response, should_delete) = self
.requests
.get_mut(&nonce)
.map(|status| {
@ -39,12 +46,15 @@ where
&& status.request.verify_response(response)
{
status.num_expected_responses -= 1;
(true, status.num_expected_responses == 0)
(
Some(success_fn(&status.request)),
status.num_expected_responses == 0,
)
} else {
(false, true)
(None, true)
}
})
.unwrap_or((false, false));
.unwrap_or((None, false));
if should_delete {
self.requests
@ -52,7 +62,7 @@ where
.expect("Delete must delete existing object");
}
is_valid
response
}
}
@ -103,7 +113,9 @@ pub(crate) mod tests {
.unwrap()
.expire_timestamp;
assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp + 1));
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp + 1, |_| ())
.is_none());
assert!(outstanding_requests.requests.get(&nonce).is_none());
}
@ -127,7 +139,9 @@ pub(crate) mod tests {
assert!(num_expected_responses > 1);
// Response that passes all checks should decrease num_expected_responses
assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1));
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp - 1, |_| ())
.is_some());
num_expected_responses -= 1;
assert_eq!(
outstanding_requests
@ -139,8 +153,12 @@ pub(crate) mod tests {
);
// Response with incorrect nonce is ignored
assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp - 1));
assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp));
assert!(outstanding_requests
.register_response(nonce + 1, &shred, expire_timestamp - 1, |_| ())
.is_none());
assert!(outstanding_requests
.register_response(nonce + 1, &shred, expire_timestamp, |_| ())
.is_none());
assert_eq!(
outstanding_requests
.requests
@ -152,7 +170,9 @@ pub(crate) mod tests {
// Response with timestamp over limit should remove status, preventing late
// responses from being accepted
assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp));
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp, |_| ())
.is_none());
assert!(outstanding_requests.requests.get(&nonce).is_none());
// If number of outstanding requests hits zero, should also remove the entry
@ -170,7 +190,9 @@ pub(crate) mod tests {
assert!(num_expected_responses > 1);
for _ in 0..num_expected_responses {
assert!(outstanding_requests.requests.get(&nonce).is_some());
assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1));
assert!(outstanding_requests
.register_response(nonce, &shred, expire_timestamp - 1, |_| ())
.is_some());
}
assert!(outstanding_requests.requests.get(&nonce).is_none());
}

View File

@ -1,6 +1,7 @@
//! The `repair_service` module implements the tools necessary to generate a thread which
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{
ancestor_hashes_service::AncestorHashesService,
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
duplicate_repair_status::DuplicateSlotRepairStatus,
@ -33,8 +34,8 @@ use std::{
time::{Duration, Instant},
};
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
pub type DuplicateSlotsResetSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>;
@ -46,6 +47,12 @@ pub struct SlotRepairs {
pubkey_repairs: HashMap<Pubkey, u64>,
}
impl SlotRepairs {
pub fn pubkey_repairs(&self) -> &HashMap<Pubkey, u64> {
&self.pubkey_repairs
}
}
#[derive(Default, Debug)]
pub struct RepairStatsGroup {
pub count: u64,
@ -111,8 +118,11 @@ pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000;
pub const REPAIR_MS: u64 = 100;
pub const MAX_ORPHANS: usize = 5;
#[derive(Clone)]
pub struct RepairInfo {
pub bank_forks: Arc<RwLock<BankForks>>,
pub cluster_info: Arc<ClusterInfo>,
pub cluster_slots: Arc<ClusterSlots>,
pub epoch_schedule: EpochSchedule,
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
pub repair_validators: Option<HashSet<Pubkey>>,
@ -134,6 +144,7 @@ impl Default for RepairSlotRange {
pub struct RepairService {
t_repair: JoinHandle<()>,
ancestor_hashes_service: AncestorHashesService,
}
impl RepairService {
@ -141,44 +152,54 @@ impl RepairService {
blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo,
cluster_slots: Arc<ClusterSlots>,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> Self {
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || {
Self::run(
&blockstore,
&exit,
&repair_socket,
cluster_info,
repair_info,
&cluster_slots,
verified_vote_receiver,
&outstanding_requests,
)
})
.unwrap();
let t_repair = {
let blockstore = blockstore.clone();
let exit = exit.clone();
let repair_info = repair_info.clone();
Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || {
Self::run(
&blockstore,
&exit,
&repair_socket,
repair_info,
verified_vote_receiver,
&outstanding_requests,
)
})
.unwrap()
};
RepairService { t_repair }
let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let ancestor_hashes_service = AncestorHashesService::new(
exit,
blockstore,
ancestor_hashes_request_socket,
repair_info,
);
RepairService {
t_repair,
ancestor_hashes_service,
}
}
fn run(
blockstore: &Blockstore,
exit: &AtomicBool,
repair_socket: &UdpSocket,
cluster_info: Arc<ClusterInfo>,
repair_info: RepairInfo,
cluster_slots: &ClusterSlots,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.id();
let serve_repair = ServeRepair::new(repair_info.cluster_info.clone());
let id = repair_info.cluster_info.id();
let mut repair_stats = RepairStats::default();
let mut repair_timing = RepairTiming::default();
let mut last_stats = Instant::now();
@ -243,7 +264,7 @@ impl RepairService {
let mut outstanding_requests = outstanding_requests.write().unwrap();
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
cluster_slots,
&repair_info.cluster_slots,
repair_request,
&mut peers_cache,
&mut repair_stats,
@ -389,12 +410,12 @@ impl RepairService {
repairs: &mut Vec<ShredRepairType>,
max_repairs: usize,
slot: Slot,
ancestor_hashes_request_statuses: &impl Contains<'a, Slot>,
duplicate_slot_repair_statuses: &impl Contains<'a, Slot>,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if ancestor_hashes_request_statuses.contains(&slot) {
if duplicate_slot_repair_statuses.contains(&slot) {
// These are repaired through a different path
continue;
}
@ -554,7 +575,8 @@ impl RepairService {
}
pub fn join(self) -> thread::Result<()> {
self.t_repair.join()
self.t_repair.join()?;
self.ancestor_hashes_service.join()
}
}
@ -875,7 +897,7 @@ mod test {
let cluster_slots = ClusterSlots::default();
let serve_repair =
ServeRepair::new(Arc::new(new_test_cluster_info(Node::new_localhost().info)));
let mut ancestor_hashes_request_statuses = HashMap::new();
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
@ -891,12 +913,12 @@ mod test {
.insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false)
.unwrap();
ancestor_hashes_request_statuses.insert(dead_slot, duplicate_status);
duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status);
// There is no repair_addr, so should not get filtered because the timeout
// `std::u64::MAX` has not expired
RepairService::generate_and_send_duplicate_repairs(
&mut ancestor_hashes_request_statuses,
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
@ -905,23 +927,23 @@ mod test {
&None,
&RwLock::new(OutstandingRequests::default()),
);
assert!(ancestor_hashes_request_statuses
assert!(duplicate_slot_repair_statuses
.get(&dead_slot)
.unwrap()
.repair_pubkey_and_addr
.is_none());
assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some());
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
// Give the slot a repair address
ancestor_hashes_request_statuses
duplicate_slot_repair_statuses
.get_mut(&dead_slot)
.unwrap()
.repair_pubkey_and_addr =
Some((Pubkey::default(), receive_socket.local_addr().unwrap()));
// Slot is not yet full, should not get filtered from `ancestor_hashes_request_statuses`
// Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses`
RepairService::generate_and_send_duplicate_repairs(
&mut ancestor_hashes_request_statuses,
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
@ -930,16 +952,16 @@ mod test {
&None,
&RwLock::new(OutstandingRequests::default()),
);
assert_eq!(ancestor_hashes_request_statuses.len(), 1);
assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some());
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
// Insert rest of shreds. Slot is full, should get filtered from
// `ancestor_hashes_request_statuses`
// `duplicate_slot_repair_statuses`
blockstore
.insert_shreds(vec![shreds.pop().unwrap()], None, false)
.unwrap();
RepairService::generate_and_send_duplicate_repairs(
&mut ancestor_hashes_request_statuses,
&mut duplicate_slot_repair_statuses,
&cluster_slots,
&blockstore,
&serve_repair,
@ -948,7 +970,7 @@ mod test {
&None,
&RwLock::new(OutstandingRequests::default()),
);
assert!(ancestor_hashes_request_statuses.is_empty());
assert!(duplicate_slot_repair_statuses.is_empty());
}
#[test]

View File

@ -574,10 +574,11 @@ impl RetransmitStage {
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators,
cluster_info: cluster_info.clone(),
cluster_slots,
};
let window_service = WindowService::new(
blockstore,
cluster_info.clone(),
verified_receiver,
retransmit_sender,
repair_socket,
@ -599,7 +600,6 @@ impl RetransmitStage {
);
rv && is_connected
},
cluster_slots,
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,

View File

@ -95,20 +95,25 @@ impl RequestResponse for ShredRepairType {
}
}
pub struct AncestorHashesRepair(Slot);
pub struct AncestorHashesRepairType(pub Slot);
impl AncestorHashesRepairType {
pub fn slot(&self) -> Slot {
self.0
}
}
#[derive(Serialize, Deserialize)]
pub enum AncestorHashesResponseVersion {
Current(Vec<SlotHash>),
}
impl AncestorHashesResponseVersion {
#[cfg(test)]
fn into_slot_hashes(self) -> Vec<SlotHash> {
pub fn into_slot_hashes(self) -> Vec<SlotHash> {
match self {
AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes,
}
}
fn slot_hashes(&self) -> &[SlotHash] {
pub fn slot_hashes(&self) -> &[SlotHash] {
match self {
AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes,
}
@ -121,7 +126,7 @@ impl AncestorHashesResponseVersion {
}
}
impl RequestResponse for AncestorHashesRepair {
impl RequestResponse for AncestorHashesRepairType {
type Response = AncestorHashesResponseVersion;
fn num_expected_responses(&self) -> u32 {
1
@ -474,6 +479,16 @@ impl ServeRepair {
Ok(out)
}
pub fn ancestor_repair_request_bytes(
&self,
request_slot: Slot,
nonce: Nonce,
) -> Result<Vec<u8>> {
let repair_request = RepairProtocol::AncestorHashes(self.my_info(), request_slot, nonce);
let out = serialize(&repair_request)?;
Ok(out)
}
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
@ -1346,7 +1361,7 @@ mod tests {
#[test]
fn test_verify_ancestor_response() {
let request_slot = MAX_ANCESTOR_RESPONSES as Slot;
let repair = AncestorHashesRepair(request_slot);
let repair = AncestorHashesRepairType(request_slot);
let mut response: Vec<SlotHash> = (0..request_slot)
.into_iter()
.map(|slot| (slot, Hash::new_unique()))

View File

@ -3,7 +3,6 @@
//!
use crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
outstanding_requests::OutstandingRequests,
repair_response,
@ -131,11 +130,14 @@ fn verify_repair(
repair_meta
.as_ref()
.map(|repair_meta| {
outstanding_requests.register_response(
repair_meta.nonce,
shred,
solana_sdk::timing::timestamp(),
)
outstanding_requests
.register_response(
repair_meta.nonce,
shred,
solana_sdk::timing::timestamp(),
|_| (),
)
.is_some()
})
.unwrap_or(true)
}
@ -329,7 +331,6 @@ impl WindowService {
#[allow(clippy::too_many_arguments)]
pub fn new<F>(
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit: PacketSender,
repair_socket: Arc<UdpSocket>,
@ -337,7 +338,6 @@ impl WindowService {
repair_info: RepairInfo,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
shred_filter: F,
cluster_slots: Arc<ClusterSlots>,
verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: CompletedDataSetsSender,
duplicate_slots_sender: DuplicateSlotSender,
@ -352,14 +352,14 @@ impl WindowService {
Arc::new(RwLock::new(OutstandingRequests::default()));
let bank_forks = repair_info.bank_forks.clone();
let cluster_info = repair_info.cluster_info.clone();
let id = cluster_info.id();
let repair_service = RepairService::new(
blockstore.clone(),
exit.clone(),
repair_socket,
cluster_info.clone(),
repair_info,
cluster_slots,
verified_vote_receiver,
outstanding_requests.clone(),
);
@ -368,7 +368,7 @@ impl WindowService {
let (duplicate_sender, duplicate_receiver) = unbounded();
let t_check_duplicate = Self::start_check_duplicate_thread(
cluster_info.clone(),
cluster_info,
exit.clone(),
blockstore.clone(),
duplicate_receiver,
@ -386,7 +386,7 @@ impl WindowService {
);
let t_window = Self::start_recv_window_thread(
cluster_info.id(),
id,
exit,
&blockstore,
insert_sender,