ancestor hashes socket ping/pong support (#26866)

This commit is contained in:
Jeff Biseda 2022-08-09 21:39:55 -07:00 committed by GitHub
parent ccfbc54195
commit 370de8129e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 270 additions and 159 deletions

View File

@ -4,29 +4,34 @@ use {
duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision}, duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision},
outstanding_requests::OutstandingRequests, outstanding_requests::OutstandingRequests,
packet_threshold::DynamicPacketToProcessThreshold, packet_threshold::DynamicPacketToProcessThreshold,
repair_response::{self},
repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup}, repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup},
replay_stage::DUPLICATE_THRESHOLD, replay_stage::DUPLICATE_THRESHOLD,
result::{Error, Result}, result::{Error, Result},
serve_repair::{AncestorHashesRepairType, ServeRepair}, serve_repair::{
AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair,
},
}, },
bincode::serialize,
crossbeam_channel::{unbounded, Receiver, Sender}, crossbeam_channel::{unbounded, Receiver, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap}, dashmap::{mapref::entry::Entry::Occupied, DashMap},
solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}, solana_gossip::{cluster_info::ClusterInfo, ping_pong::Pong},
solana_ledger::blockstore::Blockstore,
solana_perf::{ solana_perf::{
packet::{Packet, PacketBatch}, packet::{deserialize_from_with_limit, Packet, PacketBatch},
recycler::Recycler, recycler::Recycler,
}, },
solana_runtime::bank::Bank, solana_runtime::bank::Bank,
solana_sdk::{ solana_sdk::{
clock::{Slot, SLOT_MS}, clock::{Slot, SLOT_MS},
pubkey::Pubkey, pubkey::Pubkey,
signature::Signable,
signer::keypair::Keypair, signer::keypair::Keypair,
timing::timestamp, timing::timestamp,
}, },
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{ std::{
collections::HashSet, collections::HashSet,
io::{Cursor, Read},
net::UdpSocket, net::UdpSocket,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -62,27 +67,25 @@ type RetryableSlotsReceiver = Receiver<Slot>;
type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>; type OutstandingAncestorHashesRepairs = OutstandingRequests<AncestorHashesRepairType>;
#[derive(Default)] #[derive(Default)]
pub struct AncestorHashesResponsesStats { struct AncestorHashesResponsesStats {
pub total_packets: usize, total_packets: usize,
pub dropped_packets: usize, processed: usize,
pub invalid_packets: usize, dropped_packets: usize,
pub processed: usize, invalid_packets: usize,
ping_count: usize,
ping_err_verify_count: usize,
} }
impl AncestorHashesResponsesStats { impl AncestorHashesResponsesStats {
fn report(&mut self) { fn report(&mut self) {
inc_new_counter_info!( datapoint_info!(
"ancestor_hashes_responses-total_packets", "ancestor_hashes_responses",
self.total_packets ("total_packets", self.total_packets, i64),
); ("processed", self.processed, i64),
inc_new_counter_info!("ancestor_hashes_responses-processed", self.processed); ("dropped_packets", self.dropped_packets, i64),
inc_new_counter_info!( ("invalid_packets", self.invalid_packets, i64),
"ancestor_hashes_responses-dropped_packets", ("ping_count", self.ping_count, i64),
self.dropped_packets ("ping_err_verify_count", self.ping_err_verify_count, i64),
);
inc_new_counter_info!(
"ancestor_hashes_responses-invalid_packets",
self.invalid_packets
); );
*self = AncestorHashesResponsesStats::default(); *self = AncestorHashesResponsesStats::default();
} }
@ -174,6 +177,8 @@ impl AncestorHashesService {
exit.clone(), exit.clone(),
repair_info.duplicate_slots_reset_sender.clone(), repair_info.duplicate_slots_reset_sender.clone(),
retryable_slots_sender, retryable_slots_sender,
repair_info.cluster_info.clone(),
ancestor_hashes_request_socket.clone(),
); );
// Generate ancestor requests for dead slots that are repairable // Generate ancestor requests for dead slots that are repairable
@ -206,6 +211,8 @@ impl AncestorHashesService {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender, duplicate_slots_reset_sender: DuplicateSlotsResetSender,
retryable_slots_sender: RetryableSlotsSender, retryable_slots_sender: RetryableSlotsSender,
cluster_info: Arc<ClusterInfo>,
ancestor_socket: Arc<UdpSocket>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
Builder::new() Builder::new()
.name("solana-ancestor-hashes-responses-service".to_string()) .name("solana-ancestor-hashes-responses-service".to_string())
@ -214,6 +221,7 @@ impl AncestorHashesService {
let mut stats = AncestorHashesResponsesStats::default(); let mut stats = AncestorHashesResponsesStats::default();
let mut packet_threshold = DynamicPacketToProcessThreshold::default(); let mut packet_threshold = DynamicPacketToProcessThreshold::default();
loop { loop {
let keypair = cluster_info.keypair().clone();
let result = Self::process_new_packets_from_channel( let result = Self::process_new_packets_from_channel(
&ancestor_hashes_request_statuses, &ancestor_hashes_request_statuses,
&response_receiver, &response_receiver,
@ -223,6 +231,8 @@ impl AncestorHashesService {
&mut packet_threshold, &mut packet_threshold,
&duplicate_slots_reset_sender, &duplicate_slots_reset_sender,
&retryable_slots_sender, &retryable_slots_sender,
&keypair,
&ancestor_socket,
); );
match result { match result {
Err(Error::RecvTimeout(_)) | Ok(_) => {} Err(Error::RecvTimeout(_)) | Ok(_) => {}
@ -241,6 +251,7 @@ impl AncestorHashesService {
} }
/// Process messages from the network /// Process messages from the network
#[allow(clippy::too_many_arguments)]
fn process_new_packets_from_channel( fn process_new_packets_from_channel(
ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>, ancestor_hashes_request_statuses: &DashMap<Slot, DeadSlotAncestorRequestStatus>,
response_receiver: &PacketBatchReceiver, response_receiver: &PacketBatchReceiver,
@ -250,6 +261,8 @@ impl AncestorHashesService {
packet_threshold: &mut DynamicPacketToProcessThreshold, packet_threshold: &mut DynamicPacketToProcessThreshold,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender, duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender, retryable_slots_sender: &RetryableSlotsSender,
keypair: &Keypair,
ancestor_socket: &UdpSocket,
) -> Result<()> { ) -> Result<()> {
let timeout = Duration::new(1, 0); let timeout = Duration::new(1, 0);
let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?]; let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?];
@ -278,6 +291,8 @@ impl AncestorHashesService {
blockstore, blockstore,
duplicate_slots_reset_sender, duplicate_slots_reset_sender,
retryable_slots_sender, retryable_slots_sender,
keypair,
ancestor_socket,
); );
} }
packet_threshold.update(total_packets, timer.elapsed()); packet_threshold.update(total_packets, timer.elapsed());
@ -292,6 +307,8 @@ impl AncestorHashesService {
blockstore: &Blockstore, blockstore: &Blockstore,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender, duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender, retryable_slots_sender: &RetryableSlotsSender,
keypair: &Keypair,
ancestor_socket: &UdpSocket,
) { ) {
packet_batch.iter().for_each(|packet| { packet_batch.iter().for_each(|packet| {
let decision = Self::verify_and_process_ancestor_response( let decision = Self::verify_and_process_ancestor_response(
@ -300,6 +317,8 @@ impl AncestorHashesService {
stats, stats,
outstanding_requests, outstanding_requests,
blockstore, blockstore,
keypair,
ancestor_socket,
); );
if let Some((slot, decision)) = decision { if let Some((slot, decision)) = decision {
Self::handle_ancestor_request_decision( Self::handle_ancestor_request_decision(
@ -321,55 +340,104 @@ impl AncestorHashesService {
stats: &mut AncestorHashesResponsesStats, stats: &mut AncestorHashesResponsesStats,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>, outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
blockstore: &Blockstore, blockstore: &Blockstore,
keypair: &Keypair,
ancestor_socket: &UdpSocket,
) -> Option<(Slot, DuplicateAncestorDecision)> { ) -> Option<(Slot, DuplicateAncestorDecision)> {
let from_addr = packet.meta.socket_addr(); let from_addr = packet.meta.socket_addr();
let ancestor_hashes_response = packet let packet_data = match packet.data(..) {
.deserialize_slice(..packet.meta.size.saturating_sub(SIZE_OF_NONCE)) Some(data) => data,
.ok()?; None => {
stats.invalid_packets += 1;
// Verify the response return None;
let request_slot = repair_response::nonce(packet).and_then(|nonce| { }
outstanding_requests.write().unwrap().register_response( };
nonce, let mut cursor = Cursor::new(packet_data);
&ancestor_hashes_response, let response = match deserialize_from_with_limit(&mut cursor) {
timestamp(), Ok(response) => response,
// If the response is valid, return the slot the request Err(_) => {
// was for stats.invalid_packets += 1;
|ancestor_hashes_request| ancestor_hashes_request.0, return None;
) }
}); };
if request_slot.is_none() { match response {
stats.invalid_packets += 1; AncestorHashesResponse::Hashes(ref hashes) => {
return None; // deserialize trailing nonce
} let nonce = match deserialize_from_with_limit(&mut cursor) {
Ok(nonce) => nonce,
// If was a valid response, there must be a valid `request_slot` Err(_) => {
let request_slot = request_slot.unwrap(); stats.invalid_packets += 1;
stats.processed += 1; return None;
}
if let Occupied(mut ancestor_hashes_status_ref) = };
ancestor_hashes_request_statuses.entry(request_slot)
{ // verify that packet does not contain extraneous data
let decision = ancestor_hashes_status_ref.get_mut().add_response( if cursor.bytes().next().is_some() {
&from_addr, stats.invalid_packets += 1;
ancestor_hashes_response.into_slot_hashes(), return None;
blockstore, }
);
if decision.is_some() { let request_slot = outstanding_requests.write().unwrap().register_response(
// Once a request is completed, remove it from the map so that new nonce,
// requests for the same slot can be made again if necessary. It's &response,
// important to hold the `write` lock here via timestamp(),
// `ancestor_hashes_status_ref` so that we don't race with deletion + // If the response is valid, return the slot the request
// insertion from the `t_ancestor_requests` thread, which may // was for
// 1) Remove expired statuses from `ancestor_hashes_request_statuses` |ancestor_hashes_request| ancestor_hashes_request.0,
// 2) Insert another new one via `manage_ancestor_requests()`. );
// In which case we wouldn't want to delete the newly inserted entry here.
ancestor_hashes_status_ref.remove(); if request_slot.is_none() {
stats.invalid_packets += 1;
return None;
}
// If was a valid response, there must be a valid `request_slot`
let request_slot = request_slot.unwrap();
stats.processed += 1;
if let Occupied(mut ancestor_hashes_status_ref) =
ancestor_hashes_request_statuses.entry(request_slot)
{
let decision = ancestor_hashes_status_ref.get_mut().add_response(
&from_addr,
hashes.clone(),
blockstore,
);
if decision.is_some() {
// Once a request is completed, remove it from the map so that new
// requests for the same slot can be made again if necessary. It's
// important to hold the `write` lock here via
// `ancestor_hashes_status_ref` so that we don't race with deletion +
// insertion from the `t_ancestor_requests` thread, which may
// 1) Remove expired statuses from `ancestor_hashes_request_statuses`
// 2) Insert another new one via `manage_ancestor_requests()`.
// In which case we wouldn't want to delete the newly inserted entry here.
ancestor_hashes_status_ref.remove();
}
decision.map(|decision| (request_slot, decision))
} else {
None
}
}
AncestorHashesResponse::Ping(ping) => {
// verify that packet does not contain extraneous data
if cursor.bytes().next().is_some() {
stats.invalid_packets += 1;
return None;
}
if ping.verify() {
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
}
} else {
stats.ping_err_verify_count += 1;
}
None
} }
decision.map(|decision| (request_slot, decision))
} else {
None
} }
} }
@ -1145,6 +1213,8 @@ mod test {
&mut AncestorHashesResponsesStats::default(), &mut AncestorHashesResponsesStats::default(),
&outstanding_requests, &outstanding_requests,
&requester_blockstore, &requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
) )
.unwrap(); .unwrap();
@ -1385,7 +1455,9 @@ mod test {
let ManageAncestorHashesState { let ManageAncestorHashesState {
ancestor_hashes_request_statuses, ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
outstanding_requests, outstanding_requests,
repair_info,
.. ..
} = ManageAncestorHashesState::new(bank_forks); } = ManageAncestorHashesState::new(bank_forks);
@ -1402,6 +1474,8 @@ mod test {
&mut AncestorHashesResponsesStats::default(), &mut AncestorHashesResponsesStats::default(),
&outstanding_requests, &outstanding_requests,
&blockstore, &blockstore,
&repair_info.cluster_info.keypair(),
&ancestor_hashes_request_socket,
) )
.is_none()); .is_none());
} }
@ -1506,6 +1580,8 @@ mod test {
&mut AncestorHashesResponsesStats::default(), &mut AncestorHashesResponsesStats::default(),
&outstanding_requests, &outstanding_requests,
&requester_blockstore, &requester_blockstore,
&requester_cluster_info.keypair(),
&ancestor_hashes_request_socket,
) )
.unwrap(); .unwrap();

View File

@ -32,7 +32,7 @@ use {
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{ solana_sdk::{
clock::Slot, clock::Slot,
feature_set::sign_repair_requests, feature_set::{check_ping_ancestor_requests, sign_repair_requests},
hash::{Hash, HASH_BYTES}, hash::{Hash, HASH_BYTES},
packet::PACKET_DATA_SIZE, packet::PACKET_DATA_SIZE,
pubkey::{Pubkey, PUBKEY_BYTES}, pubkey::{Pubkey, PUBKEY_BYTES},
@ -43,7 +43,6 @@ use {
}, },
solana_streamer::{ solana_streamer::{
sendmmsg::{batch_send, SendPktsError}, sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace,
streamer::{PacketBatchReceiver, PacketBatchSender}, streamer::{PacketBatchReceiver, PacketBatchSender},
}, },
std::{ std::{
@ -131,36 +130,21 @@ impl AncestorHashesRepairType {
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum AncestorHashesResponseVersion { pub enum AncestorHashesResponse {
Current(Vec<SlotHash>), Hashes(Vec<SlotHash>),
} Ping(Ping),
impl AncestorHashesResponseVersion {
pub fn into_slot_hashes(self) -> Vec<SlotHash> {
match self {
AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes,
}
}
pub fn slot_hashes(&self) -> &[SlotHash] {
match self {
AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes,
}
}
fn max_ancestors_in_response(&self) -> usize {
match self {
AncestorHashesResponseVersion::Current(_) => MAX_ANCESTOR_RESPONSES,
}
}
} }
impl RequestResponse for AncestorHashesRepairType { impl RequestResponse for AncestorHashesRepairType {
type Response = AncestorHashesResponseVersion; type Response = AncestorHashesResponse;
fn num_expected_responses(&self) -> u32 { fn num_expected_responses(&self) -> u32 {
1 1
} }
fn verify_response(&self, response: &AncestorHashesResponseVersion) -> bool { fn verify_response(&self, response: &AncestorHashesResponse) -> bool {
response.slot_hashes().len() <= response.max_ancestors_in_response() match response {
AncestorHashesResponse::Hashes(hashes) => hashes.len() <= MAX_ANCESTOR_RESPONSES,
AncestorHashesResponse::Ping(ping) => ping.verify(),
}
} }
} }
@ -241,7 +225,7 @@ pub enum RepairProtocol {
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
enum RepairResponse { pub(crate) enum RepairResponse {
Ping(Ping), Ping(Ping),
} }
@ -279,23 +263,6 @@ impl RepairProtocol {
| Self::AncestorHashes { .. } => true, | Self::AncestorHashes { .. } => true,
} }
} }
fn requires_ping_check(&self) -> bool {
match self {
Self::LegacyWindowIndex(_, _, _)
| Self::LegacyHighestWindowIndex(_, _, _)
| Self::LegacyOrphan(_, _)
| Self::LegacyWindowIndexWithNonce(_, _, _, _)
| Self::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| Self::LegacyOrphanWithNonce(_, _, _)
| Self::LegacyAncestorHashes(_, _, _)
| Self::Pong(_)
| Self::AncestorHashes { .. } => false,
Self::WindowIndex { .. } | Self::HighestWindowIndex { .. } | Self::Orphan { .. } => {
true
}
}
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -475,6 +442,24 @@ impl ServeRepair {
} }
} }
fn check_ping_ancestor_requests_activated_epoch(root_bank: &Bank) -> Option<Epoch> {
root_bank
.feature_set
.activated_slot(&check_ping_ancestor_requests::id())
.map(|slot| root_bank.epoch_schedule().get_epoch(slot))
}
fn should_check_ping_ancestor_request(
slot: Slot,
root_bank: &Bank,
check_ping_ancestor_request_epoch: Option<Epoch>,
) -> bool {
match check_ping_ancestor_request_epoch {
None => false,
Some(feature_epoch) => feature_epoch < root_bank.epoch_schedule().get_epoch(slot),
}
}
/// Process messages from the network /// Process messages from the network
fn run_listen( fn run_listen(
&self, &self,
@ -682,26 +667,11 @@ impl ServeRepair {
request: &RepairProtocol, request: &RepairProtocol,
from_addr: &SocketAddr, from_addr: &SocketAddr,
identity_keypair: &Keypair, identity_keypair: &Keypair,
socket_addr_space: &SocketAddrSpace,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
pending_pings: &mut Vec<(SocketAddr, Ping)>, ) -> (bool, Option<Ping>) {
stats: &mut ServeRepairStats,
) -> bool {
if !ContactInfo::is_valid_address(from_addr, socket_addr_space) {
stats.err_malformed += 1;
return false;
}
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok(); let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok();
let (check, ping) = ping_cache.check(Instant::now(), (*request.sender(), *from_addr), &mut pingf)
ping_cache.check(Instant::now(), (*request.sender(), *from_addr), &mut pingf);
if let Some(ping) = ping {
pending_pings.push((*from_addr, ping));
}
if !check {
stats.pings_required += 1;
}
check
} }
fn requires_signature_check( fn requires_signature_check(
@ -727,6 +697,44 @@ impl ServeRepair {
} }
} }
fn ping_to_packet_mapper_by_request_variant(
request: &RepairProtocol,
dest_addr: SocketAddr,
root_bank: &Bank,
check_ping_ancestor_request_epoch: Option<Epoch>,
) -> Option<Box<dyn FnOnce(Ping) -> Option<Packet>>> {
match request {
RepairProtocol::LegacyWindowIndex(_, _, _)
| RepairProtocol::LegacyHighestWindowIndex(_, _, _)
| RepairProtocol::LegacyOrphan(_, _)
| RepairProtocol::LegacyWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::LegacyAncestorHashes(_, _, _)
| RepairProtocol::Pong(_) => None,
RepairProtocol::WindowIndex { .. }
| RepairProtocol::HighestWindowIndex { .. }
| RepairProtocol::Orphan { .. } => Some(Box::new(move |ping| {
let ping = RepairResponse::Ping(ping);
Packet::from_data(Some(&dest_addr), ping).ok()
})),
RepairProtocol::AncestorHashes { slot, .. } => {
if Self::should_check_ping_ancestor_request(
*slot,
root_bank,
check_ping_ancestor_request_epoch,
) {
Some(Box::new(move |ping| {
let ping = AncestorHashesResponse::Ping(ping);
Packet::from_data(Some(&dest_addr), ping).ok()
}))
} else {
None
}
}
}
}
fn handle_packets( fn handle_packets(
&self, &self,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
@ -739,6 +747,8 @@ impl ServeRepair {
data_budget: &DataBudget, data_budget: &DataBudget,
) { ) {
let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank); let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank);
let check_ping_ancestor_request_epoch =
Self::check_ping_ancestor_requests_activated_epoch(root_bank);
let identity_keypair = self.cluster_info.keypair().clone(); let identity_keypair = self.cluster_info.keypair().clone();
let socket_addr_space = *self.cluster_info.socket_addr_space(); let socket_addr_space = *self.cluster_info.socket_addr_space();
let my_id = identity_keypair.pubkey(); let my_id = identity_keypair.pubkey();
@ -772,18 +782,27 @@ impl ServeRepair {
} }
let from_addr = packet.meta.socket_addr(); let from_addr = packet.meta.socket_addr();
if request.requires_ping_check() if let Some(ping_to_pkt) = Self::ping_to_packet_mapper_by_request_variant(
&& !Self::check_ping_cache( &request,
&request, from_addr,
&from_addr, root_bank,
&identity_keypair, check_ping_ancestor_request_epoch,
&socket_addr_space, ) {
ping_cache, if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
&mut pending_pings, stats.err_malformed += 1;
stats, continue;
) }
{ let (check, ping) =
continue; Self::check_ping_cache(&request, &from_addr, &identity_keypair, ping_cache);
if let Some(ping) = ping {
if let Some(pkt) = ping_to_pkt(ping) {
pending_pings.push(pkt);
}
}
if !check {
stats.pings_required += 1;
continue;
}
} }
stats.processed += 1; stats.processed += 1;
@ -806,15 +825,8 @@ impl ServeRepair {
} }
if !pending_pings.is_empty() { if !pending_pings.is_empty() {
let packets: Vec<_> = pending_pings let batch = PacketBatch::new(pending_pings);
.into_iter() let _ignore = response_sender.send(batch);
.filter_map(|(sockaddr, ping)| {
let ping = RepairResponse::Ping(ping);
Packet::from_data(Some(&sockaddr), ping).ok()
})
.collect();
let batch = PacketBatch::new(packets);
let _ = response_sender.send(batch);
} }
} }
@ -1198,7 +1210,7 @@ impl ServeRepair {
// If this slot is not duplicate confirmed, return nothing // If this slot is not duplicate confirmed, return nothing
vec![] vec![]
}; };
let response = AncestorHashesResponseVersion::Current(ancestor_slot_hashes); let response = AncestorHashesResponse::Hashes(ancestor_slot_hashes);
let serialized_response = serialize(&response).ok()?; let serialized_response = serialize(&response).ok()?;
// Could probably directly write response into packet via `serialize_into()` // Could probably directly write response into packet via `serialize_into()`
@ -1961,7 +1973,7 @@ mod tests {
#[test] #[test]
fn test_run_ancestor_hashes() { fn test_run_ancestor_hashes() {
fn deserialize_ancestor_hashes_response(packet: &Packet) -> AncestorHashesResponseVersion { fn deserialize_ancestor_hashes_response(packet: &Packet) -> AncestorHashesResponse {
packet packet
.deserialize_slice(..packet.meta.size - SIZE_OF_NONCE) .deserialize_slice(..packet.meta.size - SIZE_OF_NONCE)
.unwrap() .unwrap()
@ -1996,7 +2008,14 @@ mod tests {
assert_eq!(rv.len(), 1); assert_eq!(rv.len(), 1);
let packet = &rv[0]; let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
assert!(ancestor_hashes_response.into_slot_hashes().is_empty()); match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
// `slot + num_slots - 1` is not marked duplicate confirmed so nothing should return // `slot + num_slots - 1` is not marked duplicate confirmed so nothing should return
// empty // empty
@ -2011,7 +2030,14 @@ mod tests {
assert_eq!(rv.len(), 1); assert_eq!(rv.len(), 1);
let packet = &rv[0]; let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
assert!(ancestor_hashes_response.into_slot_hashes().is_empty()); match ancestor_hashes_response {
AncestorHashesResponse::Hashes(hashes) => {
assert!(hashes.is_empty());
}
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
// Set duplicate confirmed // Set duplicate confirmed
let mut expected_ancestors = Vec::with_capacity(num_slots as usize); let mut expected_ancestors = Vec::with_capacity(num_slots as usize);
@ -2033,10 +2059,14 @@ mod tests {
assert_eq!(rv.len(), 1); assert_eq!(rv.len(), 1);
let packet = &rv[0]; let packet = &rv[0];
let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet);
assert_eq!( match ancestor_hashes_response {
ancestor_hashes_response.into_slot_hashes(), AncestorHashesResponse::Hashes(hashes) => {
expected_ancestors assert_eq!(hashes, expected_ancestors);
); }
_ => {
panic!("unexpected response: {:?}", &ancestor_hashes_response);
}
}
} }
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
@ -2184,10 +2214,10 @@ mod tests {
.into_iter() .into_iter()
.map(|slot| (slot, Hash::new_unique())) .map(|slot| (slot, Hash::new_unique()))
.collect(); .collect();
assert!(repair.verify_response(&AncestorHashesResponseVersion::Current(response.clone()))); assert!(repair.verify_response(&AncestorHashesResponse::Hashes(response.clone())));
// over the allowed limit, should fail // over the allowed limit, should fail
response.push((request_slot, Hash::new_unique())); response.push((request_slot, Hash::new_unique()));
assert!(!repair.verify_response(&AncestorHashesResponseVersion::Current(response))); assert!(!repair.verify_response(&AncestorHashesResponse::Hashes(response)));
} }
} }

View File

@ -492,6 +492,10 @@ pub mod concurrent_replay_of_forks {
solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe"); solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe");
} }
pub mod check_ping_ancestor_requests {
solana_sdk::declare_id!("AXLB87anNaUQtqBSsxkm4gvNzYY985aLtNtpJC94uWLJ");
}
pub mod incremental_snapshot_only_incremental_hash_calculation { pub mod incremental_snapshot_only_incremental_hash_calculation {
solana_sdk::declare_id!("25vqsfjk7Nv1prsQJmA4Xu1bN61s8LXCBGUPp8Rfy1UF"); solana_sdk::declare_id!("25vqsfjk7Nv1prsQJmA4Xu1bN61s8LXCBGUPp8Rfy1UF");
} }
@ -617,6 +621,7 @@ lazy_static! {
(compact_vote_state_updates::id(), "Compact vote state updates to lower block size"), (compact_vote_state_updates::id(), "Compact vote state updates to lower block size"),
(sign_repair_requests::id(), "sign repair requests #26834"), (sign_repair_requests::id(), "sign repair requests #26834"),
(concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"), (concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"),
(check_ping_ancestor_requests::id(), "ancestor hash repair socket ping/pong support #26963"),
(incremental_snapshot_only_incremental_hash_calculation::id(), "only hash accounts in incremental snapshot during incremental snapshot creation #26799"), (incremental_snapshot_only_incremental_hash_calculation::id(), "only hash accounts in incremental snapshot during incremental snapshot creation #26799"),
(disable_cpi_setting_executable_and_rent_epoch::id(), "disable setting is_executable and_rent_epoch in CPI #26987"), (disable_cpi_setting_executable_and_rent_epoch::id(), "disable setting is_executable and_rent_epoch in CPI #26987"),
/*************** ADD NEW FEATURES HERE ***************/ /*************** ADD NEW FEATURES HERE ***************/