From 1171002f467cb09c67271ddff3357d7b1e6fe46e Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 28 Aug 2023 21:34:09 +0000 Subject: [PATCH] adds repair-protocol identifying serve-repair socket (#33028) Working towards migrating repair over QUIC protocol, the commit adds repair-protocol argument for identifying right serve-repair socket. --- core/src/repair/ancestor_hashes_service.rs | 91 +++++++++-------- core/src/repair/repair_service.rs | 14 ++- core/src/repair/serve_repair.rs | 108 +++++++++++++-------- core/src/repair/serve_repair_service.rs | 7 +- 4 files changed, 126 insertions(+), 94 deletions(-) diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index ea3b10d642..0be84e2f74 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -9,7 +9,7 @@ use { packet_threshold::DynamicPacketToProcessThreshold, repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup}, serve_repair::{ - AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair, + self, AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair, }, }, replay_stage::DUPLICATE_THRESHOLD, @@ -17,7 +17,7 @@ use { bincode::serialize, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, dashmap::{mapref::entry::Entry::Occupied, DashMap}, - solana_gossip::{cluster_info::ClusterInfo, ping_pong::Pong}, + solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol, ping_pong::Pong}, solana_ledger::blockstore::Blockstore, solana_perf::{ packet::{deserialize_from_with_limit, Packet, PacketBatch}, @@ -26,6 +26,7 @@ use { solana_runtime::bank::Bank, solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT}, + genesis_config::ClusterType, pubkey::Pubkey, signature::Signable, signer::keypair::Keypair, @@ -207,11 +208,8 @@ impl AncestorHashesService { Self { thread_hdls } } - pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) + pub(crate) fn join(self) -> thread::Result<()> { + self.thread_hdls.into_iter().try_for_each(JoinHandle::join) } /// Listen for responses to our ancestors hashes repair requests @@ -232,7 +230,7 @@ impl AncestorHashesService { let mut last_stats_report = Instant::now(); let mut stats = AncestorHashesResponsesStats::default(); let mut packet_threshold = DynamicPacketToProcessThreshold::default(); - loop { + while !exit.load(Ordering::Relaxed) { let keypair = cluster_info.keypair().clone(); let result = Self::process_new_packets_from_channel( &ancestor_hashes_request_statuses, @@ -253,9 +251,6 @@ impl AncestorHashesService { return; } }; - if exit.load(Ordering::Relaxed) { - return; - } if last_stats_report.elapsed().as_secs() > 2 { stats.report(); last_stats_report = Instant::now(); @@ -639,6 +634,7 @@ impl AncestorHashesService { request_throttle: &mut Vec, ) { let root_bank = repair_info.bank_forks.read().unwrap().root_bank(); + let cluster_type = root_bank.cluster_type(); for (slot, request_type) in retryable_slots_receiver.try_iter() { datapoint_info!("ancestor-repair-retry", ("slot", slot, i64)); if request_type.is_pruned() { @@ -732,6 +728,7 @@ impl AncestorHashesService { outstanding_requests, identity_keypair, request_type, + cluster_type, ) { request_throttle.push(timestamp()); if request_type.is_pruned() { @@ -808,46 +805,53 @@ impl AncestorHashesService { outstanding_requests: &RwLock, identity_keypair: &Keypair, request_type: AncestorRequestType, + cluster_type: ClusterType, ) -> bool { - let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers( + let repair_protocol = serve_repair::get_repair_protocol(cluster_type); + let Ok(sampled_validators) = serve_repair.repair_request_ancestor_hashes_sample_peers( duplicate_slot, cluster_slots, repair_validators, - ); + repair_protocol, + ) else { + return false; + }; - if let Ok(sampled_validators) = sampled_validators { - for (pubkey, socket_addr) in sampled_validators.iter() { - repair_stats - .ancestor_requests - .update(pubkey, duplicate_slot, 0); - let nonce = outstanding_requests - .write() - .unwrap() - .add_request(AncestorHashesRepairType(duplicate_slot), timestamp()); - let request_bytes = serve_repair.ancestor_repair_request_bytes( - identity_keypair, - pubkey, - duplicate_slot, - nonce, - ); - if let Ok(request_bytes) = request_bytes { + for (pubkey, socket_addr) in &sampled_validators { + repair_stats + .ancestor_requests + .update(pubkey, duplicate_slot, 0); + let nonce = outstanding_requests + .write() + .unwrap() + .add_request(AncestorHashesRepairType(duplicate_slot), timestamp()); + let Ok(request_bytes) = serve_repair.ancestor_repair_request_bytes( + identity_keypair, + pubkey, + duplicate_slot, + nonce, + ) else { + continue; + }; + match repair_protocol { + Protocol::UDP => { let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr); } + Protocol::QUIC => todo!(), } - - let ancestor_request_status = AncestorRequestStatus::new( - sampled_validators - .into_iter() - .map(|(_pk, socket_addr)| socket_addr), - duplicate_slot, - request_type, - ); - assert!(!ancestor_hashes_request_statuses.contains_key(&duplicate_slot)); - ancestor_hashes_request_statuses.insert(duplicate_slot, ancestor_request_status); - true - } else { - false } + + let ancestor_request_status = AncestorRequestStatus::new( + sampled_validators + .into_iter() + .map(|(_pk, socket_addr)| socket_addr), + duplicate_slot, + request_type, + ); + assert!(ancestor_hashes_request_statuses + .insert(duplicate_slot, ancestor_request_status) + .is_none()); + true } } @@ -1451,6 +1455,7 @@ mod test { &outstanding_requests, &requester_cluster_info.keypair(), AncestorRequestType::DeadDuplicateConfirmed, + ClusterType::Development, ); assert!(ancestor_hashes_request_statuses.is_empty()); @@ -1499,6 +1504,7 @@ mod test { &outstanding_requests, &requester_cluster_info.keypair(), AncestorRequestType::DeadDuplicateConfirmed, + ClusterType::Development, ); assert_eq!(ancestor_hashes_request_statuses.len(), 1); @@ -1559,6 +1565,7 @@ mod test { &outstanding_requests, &requester_cluster_info.keypair(), AncestorRequestType::PopularPruned, + ClusterType::Development, ); assert_eq!(ancestor_hashes_request_statuses.len(), 1); diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index fda0931a45..592500929d 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -14,7 +14,7 @@ use { duplicate_repair_status::AncestorDuplicateSlotToRepair, outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, - serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, + serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, }, }, crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}, @@ -305,17 +305,14 @@ impl RepairService { let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY); let mut popular_pruned_forks_requests = HashSet::new(); - loop { - if exit.load(Ordering::Relaxed) { - break; - } - + while !exit.load(Ordering::Relaxed) { let mut set_root_elapsed; let mut dump_slots_elapsed; let mut get_votes_elapsed; let mut add_votes_elapsed; let root_bank = repair_info.bank_forks.read().unwrap().root_bank(); + let repair_protocol = serve_repair::get_repair_protocol(root_bank.cluster_type()); let repairs = { let new_root = root_bank.slot(); @@ -425,17 +422,18 @@ impl RepairService { let batch: Vec<(Vec, SocketAddr)> = { let mut outstanding_requests = outstanding_requests.write().unwrap(); repairs - .iter() + .into_iter() .filter_map(|repair_request| { let (to, req) = serve_repair .repair_request( &repair_info.cluster_slots, - *repair_request, + repair_request, &mut peers_cache, &mut repair_stats, &repair_info.repair_validators, &mut outstanding_requests, identity_keypair, + repair_protocol, ) .ok()?; Some((req, to)) diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index a4fe789e3b..024b18088f 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -34,6 +34,7 @@ use { solana_runtime::bank_forks::BankForks, solana_sdk::{ clock::Slot, + genesis_config::ClusterType, hash::{Hash, HASH_BYTES}, packet::PACKET_DATA_SIZE, pubkey::{Pubkey, PUBKEY_BYTES}, @@ -212,7 +213,7 @@ impl RepairRequestHeader { pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>; /// Window protocol messages -#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)] +#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize)] #[frozen_abi(digest = "3VzVe3kMrG6ijkVPyCGeJVA9hQjWcFEZbAQPc5Zizrjm")] pub enum RepairProtocol { LegacyWindowIndex(LegacyContactInfo, Slot, u64), @@ -335,10 +336,17 @@ pub struct ServeRepair { // Cache entry for repair peers for a slot. pub(crate) struct RepairPeers { asof: Instant, - peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>, + peers: Vec, weighted_index: WeightedIndex, } +struct Node { + pubkey: Pubkey, + serve_repair: SocketAddr, + #[allow(dead_code)] + serve_repair_quic: SocketAddr, +} + impl RepairPeers { fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result { if peers.len() != weights.len() { @@ -348,8 +356,12 @@ impl RepairPeers { .iter() .zip(weights) .filter_map(|(peer, &weight)| { - let addr = peer.serve_repair(Protocol::UDP).ok()?; - Some(((*peer.pubkey(), addr), weight)) + let node = Node { + pubkey: *peer.pubkey(), + serve_repair: peer.serve_repair(Protocol::UDP).ok()?, + serve_repair_quic: peer.serve_repair(Protocol::QUIC).ok()?, + }; + Some((node, weight)) }) .unzip(); if peers.is_empty() { @@ -363,9 +375,9 @@ impl RepairPeers { }) } - fn sample(&self, rng: &mut R) -> (Pubkey, SocketAddr) { + fn sample(&self, rng: &mut R) -> &Node { let index = self.weighted_index.sample(rng); - self.peers[index] + &self.peers[index] } } @@ -508,11 +520,8 @@ impl ServeRepair { my_id: &Pubkey, socket_addr_space: &SocketAddrSpace, ) -> Result { - let request: RepairProtocol = match packet.deserialize_slice(..) { - Ok(request) => request, - Err(_) => { - return Err(Error::from(RepairVerifyError::Malformed)); - } + let Ok(request) = packet.deserialize_slice(..) else { + return Err(Error::from(RepairVerifyError::Malformed)); }; let from_addr = packet.meta().socket_addr(); if !ContactInfo::is_valid_address(&from_addr, socket_addr_space) { @@ -520,7 +529,7 @@ impl ServeRepair { } Self::verify_signed_packet(my_id, packet, &request)?; if request.sender() == my_id { - error!("self repair: from_addr={from_addr} my_id={my_id} request={request}"); + error!("self repair: from_addr={from_addr} my_id={my_id} request={request:?}"); return Err(Error::from(RepairVerifyError::SelfRepair)); } let stake = *epoch_staked_nodes @@ -804,7 +813,7 @@ impl ServeRepair { let mut last_print = Instant::now(); let mut stats = ServeRepairStats::default(); let data_budget = DataBudget::default(); - loop { + while !exit.load(Ordering::Relaxed) { let result = self.run_listen( &mut ping_cache, &recycler, @@ -821,9 +830,6 @@ impl ServeRepair { return; } }; - if exit.load(Ordering::Relaxed) { - return; - } if last_print.elapsed().as_secs() > 2 { self.report_reset_stats(&mut stats); last_print = Instant::now(); @@ -1026,6 +1032,7 @@ impl ServeRepair { repair_validators: &Option>, outstanding_requests: &mut OutstandingShredRepairs, identity_keypair: &Keypair, + repair_protocol: Protocol, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location @@ -1041,11 +1048,11 @@ impl ServeRepair { peers_cache.get(&slot).unwrap() } }; - let (peer, addr) = repair_peers.sample(&mut rand::thread_rng()); + let peer = repair_peers.sample(&mut rand::thread_rng()); let nonce = outstanding_requests.add_request(repair_request, timestamp()); let out = self.map_repair_request( &repair_request, - &peer, + &peer.pubkey, repair_stats, nonce, identity_keypair, @@ -1055,7 +1062,10 @@ impl ServeRepair { identity_keypair.pubkey(), repair_request ); - Ok((addr, out)) + match repair_protocol { + Protocol::UDP => Ok((peer.serve_repair, out)), + Protocol::QUIC => todo!(), + } } pub(crate) fn repair_request_ancestor_hashes_sample_peers( @@ -1063,6 +1073,7 @@ impl ServeRepair { slot: Slot, cluster_slots: &ClusterSlots, repair_validators: &Option>, + repair_protocol: Protocol, ) -> Result> { let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot); if repair_peers.is_empty() { @@ -1076,7 +1087,7 @@ impl ServeRepair { .shuffle(&mut rand::thread_rng()) .map(|i| index[i]) .filter_map(|i| { - let addr = repair_peers[i].serve_repair(Protocol::UDP).ok()?; + let addr = repair_peers[i].serve_repair(repair_protocol).ok()?; Some((*repair_peers[i].pubkey(), addr)) }) .take(get_ancestor_hash_repair_sample_size()) @@ -1084,7 +1095,8 @@ impl ServeRepair { Ok(peers) } - pub fn repair_request_duplicate_compute_best_peer( + #[cfg(test)] + pub(crate) fn repair_request_duplicate_compute_best_peer( &self, slot: Slot, cluster_slots: &ClusterSlots, @@ -1346,6 +1358,11 @@ impl ServeRepair { } } +#[inline] +pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol { + Protocol::UDP +} + #[cfg(test)] mod tests { use { @@ -1703,10 +1720,10 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - assert!(matches!( + assert_matches!( ServeRepair::verify_signed_packet(&my_keypair.pubkey(), &packet, &request), Err(Error::RepairVerify(RepairVerifyError::IdMismatch)) - )); + ); // outside time window let packet = { @@ -1725,10 +1742,10 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - assert!(matches!( + assert_matches!( ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request), Err(Error::RepairVerify(RepairVerifyError::TimeSkew)) - )); + ); // bad signature let packet = { @@ -1745,10 +1762,10 @@ mod tests { packet }; let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); - assert!(matches!( + assert_matches!( ServeRepair::verify_signed_packet(&other_keypair.pubkey(), &packet, &request), Err(Error::RepairVerify(RepairVerifyError::SigVerify)) - )); + ); } #[test] @@ -1901,6 +1918,7 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, + Protocol::UDP, // repair_protocol ); assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers))); @@ -1919,6 +1937,8 @@ mod tests { nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap(); nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap(); nxt.set_serve_repair(serve_repair_addr).unwrap(); + nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237)) + .unwrap(); cluster_info.insert_info(nxt.clone()); let rv = serve_repair .repair_request( @@ -1929,6 +1949,7 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, + Protocol::UDP, // repair_protocol ) .unwrap(); assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr); @@ -1949,6 +1970,8 @@ mod tests { nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap(); nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap(); nxt.set_serve_repair(serve_repair_addr2).unwrap(); + nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237)) + .unwrap(); cluster_info.insert_info(nxt); let mut one = false; let mut two = false; @@ -1963,6 +1986,7 @@ mod tests { &None, &mut outstanding_requests, &identity_keypair, + Protocol::UDP, // repair_protocol ) .unwrap(); if rv.0 == serve_repair_addr { @@ -2232,8 +2256,8 @@ mod tests { for pubkey in &[solana_sdk::pubkey::new_rand(), *me.pubkey()] { let known_validators = Some(vec![*pubkey].into_iter().collect()); assert!(serve_repair.repair_peers(&known_validators, 1).is_empty()); - assert!(serve_repair - .repair_request( + assert_matches!( + serve_repair.repair_request( &cluster_slots, ShredRepairType::Shred(0, 0), &mut LruCache::new(100), @@ -2241,8 +2265,10 @@ mod tests { &known_validators, &mut OutstandingShredRepairs::default(), &identity_keypair, - ) - .is_err()); + Protocol::UDP, // repair_protocol + ), + Err(Error::ClusterInfo(ClusterInfoError::NoPeers)) + ); } // If known validator exists in gossip, should return repair successfully @@ -2250,8 +2276,8 @@ mod tests { let repair_peers = serve_repair.repair_peers(&known_validators, 1); assert_eq!(repair_peers.len(), 1); assert_eq!(repair_peers[0].pubkey(), contact_info2.pubkey()); - assert!(serve_repair - .repair_request( + assert_matches!( + serve_repair.repair_request( &cluster_slots, ShredRepairType::Shred(0, 0), &mut LruCache::new(100), @@ -2259,8 +2285,10 @@ mod tests { &known_validators, &mut OutstandingShredRepairs::default(), &identity_keypair, - ) - .is_ok()); + Protocol::UDP, // repair_protocol + ), + Ok(_) + ); // Using no known validators should default to all // validator's available in gossip, excluding myself @@ -2272,8 +2300,8 @@ mod tests { assert_eq!(repair_peers.len(), 2); assert!(repair_peers.contains(contact_info2.pubkey())); assert!(repair_peers.contains(contact_info3.pubkey())); - assert!(serve_repair - .repair_request( + assert_matches!( + serve_repair.repair_request( &cluster_slots, ShredRepairType::Shred(0, 0), &mut LruCache::new(100), @@ -2281,8 +2309,10 @@ mod tests { &None, &mut OutstandingShredRepairs::default(), &identity_keypair, - ) - .is_ok()); + Protocol::UDP, // repair_protocol + ), + Ok(_) + ); } #[test] diff --git a/core/src/repair/serve_repair_service.rs b/core/src/repair/serve_repair_service.rs index 2bfb145fcb..bd0298aacb 100644 --- a/core/src/repair/serve_repair_service.rs +++ b/core/src/repair/serve_repair_service.rs @@ -59,10 +59,7 @@ impl ServeRepairService { Self { thread_hdls } } - pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) + pub(crate) fn join(self) -> thread::Result<()> { + self.thread_hdls.into_iter().try_for_each(JoinHandle::join) } }