Add non-default repair nonce values (#16512)
* Track outstanding nonces in repair * Rework outstanding requests to use lru cache and randomize nonces Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
parent
36e11998c7
commit
8e69dd42c1
|
@ -45,6 +45,7 @@ pub mod ledger_cleanup_service;
|
|||
pub mod non_circulating_supply;
|
||||
pub mod optimistic_confirmation_verifier;
|
||||
pub mod optimistically_confirmed_bank_tracker;
|
||||
pub mod outstanding_requests;
|
||||
pub mod packet_hasher;
|
||||
pub mod ping_pong;
|
||||
pub mod poh_recorder;
|
||||
|
@ -55,6 +56,7 @@ pub mod repair_service;
|
|||
pub mod repair_weight;
|
||||
pub mod repair_weighted_traversal;
|
||||
pub mod replay_stage;
|
||||
pub mod request_response;
|
||||
mod result;
|
||||
pub mod retransmit_stage;
|
||||
pub mod rewards_recorder_service;
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
use crate::request_response::RequestResponse;
|
||||
use lru::LruCache;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_ledger::shred::Nonce;
|
||||
|
||||
pub const DEFAULT_REQUEST_EXPIRATION_MS: u64 = 60_000;
|
||||
|
||||
pub struct OutstandingRequests<T> {
|
||||
requests: LruCache<Nonce, RequestStatus<T>>,
|
||||
}
|
||||
|
||||
impl<T, S> OutstandingRequests<T>
|
||||
where
|
||||
T: RequestResponse<Response = S>,
|
||||
{
|
||||
// Returns boolean indicating whether sufficient time has passed for a request with
|
||||
// the given timestamp to be made
|
||||
pub fn add_request(&mut self, request: T, now: u64) -> Nonce {
|
||||
let num_expected_responses = request.num_expected_responses();
|
||||
let nonce = thread_rng().gen_range(0, Nonce::MAX);
|
||||
self.requests.put(
|
||||
nonce,
|
||||
RequestStatus {
|
||||
expire_timestamp: now + DEFAULT_REQUEST_EXPIRATION_MS,
|
||||
num_expected_responses,
|
||||
request,
|
||||
},
|
||||
);
|
||||
nonce
|
||||
}
|
||||
|
||||
pub fn register_response(&mut self, nonce: u32, response: &S, now: u64) -> bool {
|
||||
let (is_valid, should_delete) = self
|
||||
.requests
|
||||
.get_mut(&nonce)
|
||||
.map(|status| {
|
||||
if status.num_expected_responses > 0
|
||||
&& now < status.expire_timestamp
|
||||
&& status.request.verify_response(response)
|
||||
{
|
||||
status.num_expected_responses -= 1;
|
||||
(true, status.num_expected_responses == 0)
|
||||
} else {
|
||||
(false, true)
|
||||
}
|
||||
})
|
||||
.unwrap_or((false, false));
|
||||
|
||||
if should_delete {
|
||||
self.requests
|
||||
.pop(&nonce)
|
||||
.expect("Delete must delete existing object");
|
||||
}
|
||||
|
||||
is_valid
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for OutstandingRequests<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
requests: LruCache::new(16 * 1024),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RequestStatus<T> {
|
||||
expire_timestamp: u64,
|
||||
num_expected_responses: u32,
|
||||
request: T,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::serve_repair::RepairType;
|
||||
use solana_ledger::shred::Shred;
|
||||
use solana_sdk::timing::timestamp;
|
||||
|
||||
#[test]
|
||||
fn test_add_request() {
|
||||
let repair_type = RepairType::Orphan(9);
|
||||
let mut outstanding_requests = OutstandingRequests::default();
|
||||
let nonce = outstanding_requests.add_request(repair_type, timestamp());
|
||||
let request_status = outstanding_requests.requests.get(&nonce).unwrap();
|
||||
assert_eq!(request_status.request, repair_type);
|
||||
assert_eq!(
|
||||
request_status.num_expected_responses,
|
||||
repair_type.num_expected_responses()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timeout_expired_remove() {
|
||||
let repair_type = RepairType::Orphan(9);
|
||||
let mut outstanding_requests = OutstandingRequests::default();
|
||||
let nonce = outstanding_requests.add_request(repair_type, timestamp());
|
||||
let shred = Shred::new_empty_data_shred();
|
||||
|
||||
let expire_timestamp = outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.expire_timestamp;
|
||||
|
||||
assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp + 1));
|
||||
assert!(outstanding_requests.requests.get(&nonce).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_register_response() {
|
||||
let repair_type = RepairType::Orphan(9);
|
||||
let mut outstanding_requests = OutstandingRequests::default();
|
||||
let nonce = outstanding_requests.add_request(repair_type, timestamp());
|
||||
|
||||
let shred = Shred::new_empty_data_shred();
|
||||
let mut expire_timestamp = outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.expire_timestamp;
|
||||
let mut num_expected_responses = outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.num_expected_responses;
|
||||
assert!(num_expected_responses > 1);
|
||||
|
||||
// Response that passes all checks should decrease num_expected_responses
|
||||
assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1));
|
||||
num_expected_responses -= 1;
|
||||
assert_eq!(
|
||||
outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.num_expected_responses,
|
||||
num_expected_responses
|
||||
);
|
||||
|
||||
// Response with incorrect nonce is ignored
|
||||
assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp - 1));
|
||||
assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp));
|
||||
assert_eq!(
|
||||
outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.num_expected_responses,
|
||||
num_expected_responses
|
||||
);
|
||||
|
||||
// Response with timestamp over limit should remove status, preventing late
|
||||
// responses from being accepted
|
||||
assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp));
|
||||
assert!(outstanding_requests.requests.get(&nonce).is_none());
|
||||
|
||||
// If number of outstanding requests hits zero, should also remove the entry
|
||||
let nonce = outstanding_requests.add_request(repair_type, timestamp());
|
||||
expire_timestamp = outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.expire_timestamp;
|
||||
num_expected_responses = outstanding_requests
|
||||
.requests
|
||||
.get(&nonce)
|
||||
.unwrap()
|
||||
.num_expected_responses;
|
||||
assert!(num_expected_responses > 1);
|
||||
for _ in 0..num_expected_responses {
|
||||
assert!(outstanding_requests.requests.get(&nonce).is_some());
|
||||
assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1));
|
||||
}
|
||||
assert!(outstanding_requests.requests.get(&nonce).is_none());
|
||||
}
|
||||
}
|
|
@ -4,9 +4,10 @@ use crate::{
|
|||
cluster_info::ClusterInfo,
|
||||
cluster_info_vote_listener::VerifiedVoteReceiver,
|
||||
cluster_slots::ClusterSlots,
|
||||
outstanding_requests::OutstandingRequests,
|
||||
repair_weight::RepairWeight,
|
||||
result::Result,
|
||||
serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE},
|
||||
serve_repair::{RepairType, ServeRepair},
|
||||
};
|
||||
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
|
||||
use solana_ledger::{
|
||||
|
@ -33,6 +34,8 @@ use std::{
|
|||
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
|
||||
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
|
||||
|
||||
pub type OutstandingRepairs = OutstandingRequests<RepairType>;
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct SlotRepairs {
|
||||
highest_shred_index: u64,
|
||||
|
@ -145,6 +148,7 @@ impl RepairService {
|
|||
repair_info: RepairInfo,
|
||||
cluster_slots: Arc<ClusterSlots>,
|
||||
verified_vote_receiver: VerifiedVoteReceiver,
|
||||
outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
|
||||
) -> Self {
|
||||
let t_repair = Builder::new()
|
||||
.name("solana-repair-service".to_string())
|
||||
|
@ -157,6 +161,7 @@ impl RepairService {
|
|||
repair_info,
|
||||
&cluster_slots,
|
||||
verified_vote_receiver,
|
||||
&outstanding_requests,
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
|
@ -172,6 +177,7 @@ impl RepairService {
|
|||
repair_info: RepairInfo,
|
||||
cluster_slots: &ClusterSlots,
|
||||
verified_vote_receiver: VerifiedVoteReceiver,
|
||||
outstanding_requests: &RwLock<OutstandingRepairs>,
|
||||
) {
|
||||
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
|
||||
let serve_repair = ServeRepair::new(cluster_info.clone());
|
||||
|
@ -190,6 +196,7 @@ impl RepairService {
|
|||
let mut set_root_elapsed;
|
||||
let mut get_votes_elapsed;
|
||||
let mut add_votes_elapsed;
|
||||
|
||||
let repairs = {
|
||||
let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone();
|
||||
let new_root = root_bank.slot();
|
||||
|
@ -261,6 +268,7 @@ impl RepairService {
|
|||
|
||||
let mut cache = HashMap::new();
|
||||
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
|
||||
let mut outstanding_requests = outstanding_requests.write().unwrap();
|
||||
repairs.into_iter().for_each(|repair_request| {
|
||||
if let Ok((to, req)) = serve_repair.repair_request(
|
||||
&cluster_slots,
|
||||
|
@ -268,6 +276,7 @@ impl RepairService {
|
|||
&mut cache,
|
||||
&mut repair_stats,
|
||||
&repair_info.repair_validators,
|
||||
&mut outstanding_requests,
|
||||
) {
|
||||
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
||||
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
||||
|
@ -467,6 +476,7 @@ impl RepairService {
|
|||
repair_stats: &mut RepairStats,
|
||||
repair_socket: &UdpSocket,
|
||||
repair_validators: &Option<HashSet<Pubkey>>,
|
||||
outstanding_requests: &RwLock<OutstandingRepairs>,
|
||||
) {
|
||||
duplicate_slot_repair_statuses.retain(|slot, status| {
|
||||
Self::update_duplicate_slot_repair_addr(
|
||||
|
@ -480,7 +490,9 @@ impl RepairService {
|
|||
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);
|
||||
|
||||
if let Some(repairs) = repairs {
|
||||
let mut outstanding_requests = outstanding_requests.write().unwrap();
|
||||
for repair_type in repairs {
|
||||
let nonce = outstanding_requests.add_request(repair_type, timestamp());
|
||||
if let Err(e) = Self::serialize_and_send_request(
|
||||
&repair_type,
|
||||
repair_socket,
|
||||
|
@ -488,7 +500,7 @@ impl RepairService {
|
|||
&repair_addr,
|
||||
serve_repair,
|
||||
repair_stats,
|
||||
DEFAULT_NONCE,
|
||||
nonce,
|
||||
) {
|
||||
info!(
|
||||
"repair req send_to {} ({}) error {:?}",
|
||||
|
@ -688,7 +700,7 @@ mod test {
|
|||
MAX_ORPHANS,
|
||||
MAX_REPAIR_LENGTH,
|
||||
&HashSet::default(),
|
||||
None
|
||||
None,
|
||||
),
|
||||
vec![RepairType::Orphan(2), RepairType::HighestShred(0, 0)]
|
||||
);
|
||||
|
@ -987,6 +999,7 @@ mod test {
|
|||
&mut RepairStats::default(),
|
||||
&UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
&None,
|
||||
&RwLock::new(OutstandingRequests::default()),
|
||||
);
|
||||
assert!(duplicate_slot_repair_statuses
|
||||
.get(&dead_slot)
|
||||
|
@ -1011,6 +1024,7 @@ mod test {
|
|||
&mut RepairStats::default(),
|
||||
&UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
&None,
|
||||
&RwLock::new(OutstandingRequests::default()),
|
||||
);
|
||||
assert_eq!(duplicate_slot_repair_statuses.len(), 1);
|
||||
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
|
||||
|
@ -1028,6 +1042,7 @@ mod test {
|
|||
&mut RepairStats::default(),
|
||||
&UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
&None,
|
||||
&RwLock::new(OutstandingRequests::default()),
|
||||
);
|
||||
assert!(duplicate_slot_repair_statuses.is_empty());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
pub trait RequestResponse {
|
||||
type Response;
|
||||
fn num_expected_responses(&self) -> u32;
|
||||
fn verify_response(&self, response: &Self::Response) -> bool;
|
||||
}
|
|
@ -3,13 +3,17 @@ use crate::{
|
|||
cluster_slots::ClusterSlots,
|
||||
contact_info::ContactInfo,
|
||||
repair_response,
|
||||
repair_service::RepairStats,
|
||||
repair_service::{OutstandingRepairs, RepairStats},
|
||||
request_response::RequestResponse,
|
||||
result::{Error, Result},
|
||||
weighted_shuffle::weighted_best,
|
||||
};
|
||||
use bincode::serialize;
|
||||
use rand::distributions::{Distribution, WeightedIndex};
|
||||
use solana_ledger::{blockstore::Blockstore, shred::Nonce};
|
||||
use solana_ledger::{
|
||||
blockstore::Blockstore,
|
||||
shred::{Nonce, Shred},
|
||||
};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_debug;
|
||||
use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler};
|
||||
|
@ -31,7 +35,6 @@ use std::{
|
|||
|
||||
/// the number of slots to respond with when responding to `Orphan` requests
|
||||
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
|
||||
pub const DEFAULT_NONCE: u32 = 42;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub enum RepairType {
|
||||
|
@ -50,6 +53,28 @@ impl RepairType {
|
|||
}
|
||||
}
|
||||
|
||||
impl RequestResponse for RepairType {
|
||||
type Response = Shred;
|
||||
fn num_expected_responses(&self) -> u32 {
|
||||
match self {
|
||||
RepairType::Orphan(_) => (MAX_ORPHAN_REPAIR_RESPONSES + 1) as u32, // run_orphan uses <= MAX_ORPHAN_REPAIR_RESPONSES
|
||||
RepairType::HighestShred(_, _) => 1,
|
||||
RepairType::Shred(_, _) => 1,
|
||||
}
|
||||
}
|
||||
fn verify_response(&self, response_shred: &Shred) -> bool {
|
||||
match self {
|
||||
RepairType::Orphan(slot) => response_shred.slot() <= *slot,
|
||||
RepairType::HighestShred(slot, index) => {
|
||||
response_shred.slot() as u64 == *slot && response_shred.index() as u64 >= *index
|
||||
}
|
||||
RepairType::Shred(slot, index) => {
|
||||
response_shred.slot() as u64 == *slot && response_shred.index() as u64 == *index
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ServeRepairStats {
|
||||
pub total_packets: usize,
|
||||
|
@ -376,6 +401,7 @@ impl ServeRepair {
|
|||
cache: &mut RepairCache,
|
||||
repair_stats: &mut RepairStats,
|
||||
repair_validators: &Option<HashSet<Pubkey>>,
|
||||
outstanding_requests: &mut OutstandingRepairs,
|
||||
) -> Result<(SocketAddr, Vec<u8>)> {
|
||||
// find a peer that appears to be accepting replication and has the desired slot, as indicated
|
||||
// by a valid tvu port location
|
||||
|
@ -395,13 +421,10 @@ impl ServeRepair {
|
|||
};
|
||||
let n = weighted_index.sample(&mut rand::thread_rng());
|
||||
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
|
||||
let nonce =
|
||||
outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp());
|
||||
let repair_peer_id = repair_peers[n].id;
|
||||
let out = self.map_repair_request(
|
||||
&repair_request,
|
||||
&repair_peer_id,
|
||||
repair_stats,
|
||||
DEFAULT_NONCE,
|
||||
)?;
|
||||
let out = self.map_repair_request(&repair_request, &repair_peer_id, repair_stats, nonce)?;
|
||||
Ok((addr, out))
|
||||
}
|
||||
|
||||
|
@ -592,6 +615,7 @@ mod tests {
|
|||
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
|
||||
},
|
||||
};
|
||||
use solana_perf::packet::Packet;
|
||||
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
|
||||
|
||||
#[test]
|
||||
|
@ -634,6 +658,8 @@ mod tests {
|
|||
nonce,
|
||||
)
|
||||
.expect("packets");
|
||||
let request = RepairType::HighestShred(slot, index);
|
||||
verify_responses(&request, rv.packets.iter());
|
||||
|
||||
let rv: Vec<Shred> = rv
|
||||
.packets
|
||||
|
@ -731,6 +757,8 @@ mod tests {
|
|||
nonce,
|
||||
)
|
||||
.expect("packets");
|
||||
let request = RepairType::Shred(slot, index);
|
||||
verify_responses(&request, rv.packets.iter());
|
||||
let rv: Vec<Shred> = rv
|
||||
.packets
|
||||
.into_iter()
|
||||
|
@ -752,12 +780,14 @@ mod tests {
|
|||
let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
|
||||
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me));
|
||||
let serve_repair = ServeRepair::new(cluster_info.clone());
|
||||
let mut outstanding_requests = OutstandingRepairs::default();
|
||||
let rv = serve_repair.repair_request(
|
||||
&cluster_slots,
|
||||
RepairType::Shred(0, 0),
|
||||
&mut HashMap::new(),
|
||||
&mut RepairStats::default(),
|
||||
&None,
|
||||
&mut outstanding_requests,
|
||||
);
|
||||
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
|
||||
|
||||
|
@ -785,6 +815,7 @@ mod tests {
|
|||
&mut HashMap::new(),
|
||||
&mut RepairStats::default(),
|
||||
&None,
|
||||
&mut outstanding_requests,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(nxt.serve_repair, serve_repair_addr);
|
||||
|
@ -818,6 +849,7 @@ mod tests {
|
|||
&mut HashMap::new(),
|
||||
&mut RepairStats::default(),
|
||||
&None,
|
||||
&mut outstanding_requests,
|
||||
)
|
||||
.unwrap();
|
||||
if rv.0 == serve_repair_addr {
|
||||
|
@ -886,6 +918,9 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
// Verify responses
|
||||
let request = RepairType::Orphan(slot);
|
||||
verify_responses(&request, rv.iter());
|
||||
|
||||
let expected: Vec<_> = (slot..slot + num_slots)
|
||||
.rev()
|
||||
.filter_map(|slot| {
|
||||
|
@ -994,6 +1029,7 @@ mod tests {
|
|||
&mut HashMap::new(),
|
||||
&mut RepairStats::default(),
|
||||
&trusted_validators,
|
||||
&mut OutstandingRepairs::default(),
|
||||
)
|
||||
.is_err());
|
||||
}
|
||||
|
@ -1010,6 +1046,7 @@ mod tests {
|
|||
&mut HashMap::new(),
|
||||
&mut RepairStats::default(),
|
||||
&trusted_validators,
|
||||
&mut OutstandingRepairs::default(),
|
||||
)
|
||||
.is_ok());
|
||||
|
||||
|
@ -1030,7 +1067,68 @@ mod tests {
|
|||
&mut HashMap::new(),
|
||||
&mut RepairStats::default(),
|
||||
&None,
|
||||
&mut OutstandingRepairs::default(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_verify_response() {
|
||||
let repair = RepairType::Orphan(9);
|
||||
// Ensure new options are addded to this test
|
||||
match repair {
|
||||
RepairType::Orphan(_) => (),
|
||||
RepairType::HighestShred(_, _) => (),
|
||||
RepairType::Shred(_, _) => (),
|
||||
};
|
||||
|
||||
let slot = 9;
|
||||
let index = 5;
|
||||
|
||||
// Orphan
|
||||
let mut shred = Shred::new_empty_data_shred();
|
||||
shred.set_slot(slot);
|
||||
let request = RepairType::Orphan(slot);
|
||||
assert!(request.verify_response(&shred));
|
||||
shred.set_slot(slot - 1);
|
||||
assert!(request.verify_response(&shred));
|
||||
shred.set_slot(slot + 1);
|
||||
assert!(!request.verify_response(&shred));
|
||||
|
||||
// HighestShred
|
||||
shred = Shred::new_empty_data_shred();
|
||||
shred.set_slot(slot);
|
||||
shred.set_index(index);
|
||||
let request = RepairType::HighestShred(slot, index as u64);
|
||||
assert!(request.verify_response(&shred));
|
||||
shred.set_index(index + 1);
|
||||
assert!(request.verify_response(&shred));
|
||||
shred.set_index(index - 1);
|
||||
assert!(!request.verify_response(&shred));
|
||||
shred.set_slot(slot - 1);
|
||||
shred.set_index(index);
|
||||
assert!(!request.verify_response(&shred));
|
||||
shred.set_slot(slot + 1);
|
||||
assert!(!request.verify_response(&shred));
|
||||
|
||||
// Shred
|
||||
shred = Shred::new_empty_data_shred();
|
||||
shred.set_slot(slot);
|
||||
shred.set_index(index);
|
||||
let request = RepairType::Shred(slot, index as u64);
|
||||
assert!(request.verify_response(&shred));
|
||||
shred.set_index(index + 1);
|
||||
assert!(!request.verify_response(&shred));
|
||||
shred.set_slot(slot + 1);
|
||||
shred.set_index(index);
|
||||
assert!(!request.verify_response(&shred));
|
||||
}
|
||||
|
||||
fn verify_responses<'a>(request: &RepairType, packets: impl Iterator<Item = &'a Packet>) {
|
||||
for packet in packets {
|
||||
let shred_payload = packet.data.to_vec();
|
||||
let shred = Shred::new_from_serialized_shred(shred_payload).unwrap();
|
||||
request.verify_response(&shred);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,10 +6,10 @@ use crate::{
|
|||
cluster_info_vote_listener::VerifiedVoteReceiver,
|
||||
cluster_slots::ClusterSlots,
|
||||
completed_data_sets_service::CompletedDataSetsSender,
|
||||
outstanding_requests::OutstandingRequests,
|
||||
repair_response,
|
||||
repair_service::{RepairInfo, RepairService},
|
||||
repair_service::{OutstandingRepairs, RepairInfo, RepairService},
|
||||
result::{Error, Result},
|
||||
serve_repair::DEFAULT_NONCE,
|
||||
};
|
||||
use crossbeam_channel::{
|
||||
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
|
||||
|
@ -28,6 +28,7 @@ use solana_rayon_threadlimit::get_thread_count;
|
|||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
|
||||
use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms};
|
||||
use solana_streamer::streamer::PacketSender;
|
||||
use std::collections::HashSet;
|
||||
use std::{
|
||||
net::{SocketAddr, UdpSocket},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
|
@ -123,13 +124,50 @@ fn run_check_duplicate(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn verify_repair(repair_info: &Option<RepairMeta>) -> bool {
|
||||
repair_info
|
||||
fn verify_repair(
|
||||
outstanding_requests: &mut OutstandingRepairs,
|
||||
shred: &Shred,
|
||||
repair_meta: &Option<RepairMeta>,
|
||||
) -> bool {
|
||||
repair_meta
|
||||
.as_ref()
|
||||
.map(|repair_info| repair_info.nonce == DEFAULT_NONCE)
|
||||
.map(|repair_meta| {
|
||||
outstanding_requests.register_response(
|
||||
repair_meta.nonce,
|
||||
&shred,
|
||||
solana_sdk::timing::timestamp(),
|
||||
)
|
||||
})
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
fn prune_shreds_invalid_repair(
|
||||
shreds: &mut Vec<Shred>,
|
||||
repair_infos: &mut Vec<Option<RepairMeta>>,
|
||||
outstanding_requests: &Arc<RwLock<OutstandingRepairs>>,
|
||||
) {
|
||||
assert_eq!(shreds.len(), repair_infos.len());
|
||||
let mut i = 0;
|
||||
let mut removed = HashSet::new();
|
||||
{
|
||||
let mut outstanding_requests = outstanding_requests.write().unwrap();
|
||||
shreds.retain(|shred| {
|
||||
let should_keep = (
|
||||
verify_repair(&mut outstanding_requests, &shred, &repair_infos[i]),
|
||||
i += 1,
|
||||
)
|
||||
.0;
|
||||
if !should_keep {
|
||||
removed.insert(i - 1);
|
||||
}
|
||||
should_keep
|
||||
});
|
||||
}
|
||||
i = 0;
|
||||
repair_infos.retain(|_repair_info| (!removed.contains(&i), i += 1).0);
|
||||
assert_eq!(shreds.len(), repair_infos.len());
|
||||
}
|
||||
|
||||
fn run_insert<F>(
|
||||
shred_receiver: &CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
|
@ -137,6 +175,7 @@ fn run_insert<F>(
|
|||
handle_duplicate: F,
|
||||
metrics: &mut BlockstoreInsertionMetrics,
|
||||
completed_data_sets_sender: &CompletedDataSetsSender,
|
||||
outstanding_requests: &Arc<RwLock<OutstandingRepairs>>,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: Fn(Shred),
|
||||
|
@ -148,11 +187,7 @@ where
|
|||
repair_infos.extend(more_repair_infos);
|
||||
}
|
||||
|
||||
assert_eq!(shreds.len(), repair_infos.len());
|
||||
let mut i = 0;
|
||||
shreds.retain(|_shred| (verify_repair(&repair_infos[i]), i += 1).0);
|
||||
repair_infos.retain(|repair_info| verify_repair(&repair_info));
|
||||
assert_eq!(shreds.len(), repair_infos.len());
|
||||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
|
||||
|
||||
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
|
||||
shreds,
|
||||
|
@ -334,6 +369,9 @@ impl WindowService {
|
|||
+ std::marker::Send
|
||||
+ std::marker::Sync,
|
||||
{
|
||||
let outstanding_requests: Arc<RwLock<OutstandingRepairs>> =
|
||||
Arc::new(RwLock::new(OutstandingRequests::default()));
|
||||
|
||||
let bank_forks = Some(repair_info.bank_forks.clone());
|
||||
|
||||
let repair_service = RepairService::new(
|
||||
|
@ -344,6 +382,7 @@ impl WindowService {
|
|||
repair_info,
|
||||
cluster_slots,
|
||||
verified_vote_receiver,
|
||||
outstanding_requests.clone(),
|
||||
);
|
||||
|
||||
let (insert_sender, insert_receiver) = unbounded();
|
||||
|
@ -364,6 +403,7 @@ impl WindowService {
|
|||
insert_receiver,
|
||||
duplicate_sender,
|
||||
completed_data_sets_sender,
|
||||
outstanding_requests,
|
||||
);
|
||||
|
||||
let t_window = Self::start_recv_window_thread(
|
||||
|
@ -424,6 +464,7 @@ impl WindowService {
|
|||
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
|
||||
check_duplicate_sender: CrossbeamSender<Shred>,
|
||||
completed_data_sets_sender: CompletedDataSetsSender,
|
||||
outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
|
||||
) -> JoinHandle<()> {
|
||||
let exit = exit.clone();
|
||||
let blockstore = blockstore.clone();
|
||||
|
@ -453,6 +494,7 @@ impl WindowService {
|
|||
&handle_duplicate,
|
||||
&mut metrics,
|
||||
&completed_data_sets_sender,
|
||||
&outstanding_requests,
|
||||
) {
|
||||
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
|
||||
break;
|
||||
|
@ -720,4 +762,31 @@ mod test {
|
|||
duplicate_shred_slot
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_shreds() {
|
||||
use crate::serve_repair::RepairType;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
solana_logger::setup();
|
||||
let (common, coding) = Shredder::new_coding_shred_header(5, 5, 5, 6, 6, 0, 0);
|
||||
let shred = Shred::new_empty_from_header(common, DataShredHeader::default(), coding);
|
||||
let mut shreds = vec![shred.clone(), shred.clone(), shred];
|
||||
let _from_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
let repair_meta = RepairMeta {
|
||||
_from_addr,
|
||||
nonce: 0,
|
||||
};
|
||||
let outstanding_requests = Arc::new(RwLock::new(OutstandingRepairs::default()));
|
||||
let repair_type = RepairType::Orphan(9);
|
||||
let nonce = outstanding_requests
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_request(repair_type, timestamp());
|
||||
let repair_meta1 = RepairMeta { _from_addr, nonce };
|
||||
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)];
|
||||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests);
|
||||
assert_eq!(repair_infos.len(), 2);
|
||||
assert!(repair_infos[0].is_none());
|
||||
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue