From 8fe4f5d0b98fa115305293b4ed40ddf2c4ed20cc Mon Sep 17 00:00:00 2001 From: Brennan Date: Wed, 17 Jan 2024 14:19:32 -0800 Subject: [PATCH] Shred Repair Request (#34771) * shred repair admin rpc --- core/src/admin_rpc_post_init.rs | 5 ++ core/src/repair/repair_service.rs | 124 ++++++++++++++++++++++++++++- core/src/repair/serve_repair.rs | 4 +- core/src/tvu.rs | 9 ++- core/src/validator.rs | 8 +- core/src/window_service.rs | 7 +- validator/src/admin_rpc_service.rs | 36 +++++++++ validator/src/cli.rs | 27 +++++++ validator/src/main.rs | 18 +++++ 9 files changed, 230 insertions(+), 8 deletions(-) diff --git a/core/src/admin_rpc_post_init.rs b/core/src/admin_rpc_post_init.rs index 3acd0f843..a7a660043 100644 --- a/core/src/admin_rpc_post_init.rs +++ b/core/src/admin_rpc_post_init.rs @@ -1,9 +1,11 @@ use { + crate::repair::{outstanding_requests, serve_repair}, solana_gossip::cluster_info::ClusterInfo, solana_runtime::bank_forks::BankForks, solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate}, std::{ collections::HashSet, + net::UdpSocket, sync::{Arc, RwLock}, }, }; @@ -15,4 +17,7 @@ pub struct AdminRpcRequestMetadataPostInit { pub vote_account: Pubkey, pub repair_whitelist: Arc>>, pub notifies: Vec>, + pub repair_socket: Arc, + pub outstanding_repair_requests: + Arc>>, } diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 36ba4978e..509b1e1b6 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -15,11 +15,15 @@ use { outstanding_requests::OutstandingRequests, quic_endpoint::LocalRequest, repair_weight::RepairWeight, - serve_repair::{self, ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, + serve_repair::{ + self, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType, + REPAIR_PEERS_CACHE_CAPACITY, + }, }, }, crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}, lru::LruCache, + solana_client::connection_cache::Protocol, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::{Blockstore, SlotMeta}, @@ -678,6 +682,70 @@ impl RepairService { } } + pub fn request_repair_for_shred_from_peer( + cluster_info: Arc, + pubkey: Pubkey, + slot: u64, + shred_index: u64, + repair_socket: &UdpSocket, + outstanding_repair_requests: Arc>, + ) { + let peer_repair_addr = cluster_info + .lookup_contact_info(&pubkey, |node| node.serve_repair(Protocol::UDP)) + .unwrap() + .unwrap(); + Self::request_repair_for_shred_from_address( + cluster_info, + pubkey, + peer_repair_addr, + slot, + shred_index, + repair_socket, + outstanding_repair_requests, + ); + } + + fn request_repair_for_shred_from_address( + cluster_info: Arc, + pubkey: Pubkey, + address: SocketAddr, + slot: u64, + shred_index: u64, + repair_socket: &UdpSocket, + outstanding_repair_requests: Arc>, + ) { + // Setup repair request + let identity_keypair = cluster_info.keypair(); + let repair_request = ShredRepairType::Shred(slot, shred_index); + let nonce = outstanding_repair_requests + .write() + .unwrap() + .add_request(repair_request, timestamp()); + + // Create repair request + let header = RepairRequestHeader::new(cluster_info.id(), pubkey, timestamp(), nonce); + let request_proto = RepairProtocol::WindowIndex { + header, + slot, + shred_index, + }; + let packet_buf = + ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap(); + + // Prepare packet batch to send + let reqs = vec![(packet_buf, address)]; + + // Send packet batch + match batch_send(repair_socket, &reqs[..]) { + Ok(()) => { + trace!("successfully sent repair request!"); + } + Err(SendPktsError::IoError(err, _num_failed)) => { + error!("batch_send failed to send packet - error = {:?}", err); + } + } + } + /// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end #[cfg(test)] pub fn generate_repairs_in_range( @@ -859,6 +927,7 @@ pub(crate) fn sleep_shred_deferment_period() { mod test { use { super::*, + crate::repair::quic_endpoint::RemoteRequest, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::{ blockstore::{ @@ -883,6 +952,59 @@ mod test { ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) } + #[test] + pub fn test_request_repair_for_shred_from_address() { + // Setup cluster and repair info + let cluster_info = Arc::new(new_test_cluster_info()); + let pubkey = cluster_info.id(); + let slot = 100; + let shred_index = 50; + let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let address = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); + + // Send a repair request + RepairService::request_repair_for_shred_from_address( + cluster_info.clone(), + pubkey, + address, + slot, + shred_index, + &sender, + outstanding_repair_requests, + ); + + // Receive and translate repair packet + let mut packets = vec![solana_sdk::packet::Packet::default(); 1]; + let _recv_count = solana_streamer::recvmmsg::recv_mmsg(&reader, &mut packets[..]).unwrap(); + let packet = &packets[0]; + let Some(bytes) = packet.data(..).map(Vec::from) else { + panic!("packet data not found"); + }; + let remote_request = RemoteRequest { + remote_pubkey: None, + remote_address: packet.meta().socket_addr(), + bytes, + response_sender: None, + }; + + // Deserialize and check the request + let deserialized = + serve_repair::deserialize_request::(&remote_request).unwrap(); + match deserialized { + RepairProtocol::WindowIndex { + slot: deserialized_slot, + shred_index: deserialized_shred_index, + .. + } => { + assert_eq!(deserialized_slot, slot); + assert_eq!(deserialized_shred_index, shred_index); + } + _ => panic!("unexpected repair protocol"), + } + } + #[test] pub fn test_repair_orphan() { let ledger_path = get_tmp_ledger_path_auto_delete!(); diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index a12848f2e..a4c676bf7 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -1384,7 +1384,9 @@ pub(crate) fn get_repair_protocol(_: ClusterType) -> Protocol { Protocol::UDP } -fn deserialize_request(request: &RemoteRequest) -> std::result::Result +pub(crate) fn deserialize_request( + request: &RemoteRequest, +) -> std::result::Result where T: serde::de::DeserializeOwned, { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2fe7e08dd..bfdb258b2 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -14,7 +14,10 @@ use { consensus::{tower_storage::TowerStorage, Tower}, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, - repair::{quic_endpoint::LocalRequest, repair_service::RepairInfo}, + repair::{ + quic_endpoint::LocalRequest, + repair_service::{OutstandingShredRepairs, RepairInfo}, + }, replay_stage::{ReplayStage, ReplayStageConfig}, rewards_recorder_service::RewardsRecorderSender, shred_fetch_stage::ShredFetchStage, @@ -138,6 +141,7 @@ impl Tvu { turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, repair_quic_endpoint_sender: AsyncSender, + outstanding_repair_requests: Arc>, ) -> Result { let TvuSockets { repair: repair_socket, @@ -228,6 +232,7 @@ impl Tvu { ancestor_hashes_replay_update_receiver, dumped_slots_receiver, popular_pruned_forks_sender, + outstanding_repair_requests, ) }; @@ -442,6 +447,7 @@ pub mod tests { let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let max_complete_rewards_slot = Arc::new(AtomicU64::default()); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let outstanding_repair_requests = Arc::>::default(); let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -496,6 +502,7 @@ pub mod tests { turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver, repair_quic_endpoint_sender, + outstanding_repair_requests, ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 13c454631..5330f47a7 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1255,13 +1255,16 @@ impl Validator { }; let last_vote = tower.last_vote(); + let outstanding_repair_requests = + Arc::>::default(); + let tvu = Tvu::new( vote_account, authorized_voter_keypairs, &bank_forks, &cluster_info, TvuSockets { - repair: node.sockets.repair, + repair: node.sockets.repair.try_clone().unwrap(), retransmit: node.sockets.retransmit_sockets, fetch: node.sockets.tvu, ancestor_hashes_requests: node.sockets.ancestor_hashes_requests, @@ -1307,6 +1310,7 @@ impl Validator { turbine_quic_endpoint_sender.clone(), turbine_quic_endpoint_receiver, repair_quic_endpoint_sender, + outstanding_repair_requests.clone(), )?; if in_wen_restart { @@ -1383,6 +1387,8 @@ impl Validator { vote_account: *vote_account, repair_whitelist: config.repair_whitelist.clone(), notifies: key_notifies, + repair_socket: Arc::new(node.sockets.repair), + outstanding_repair_requests, }); Ok(Self { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 49418c826..aa801b7eb 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -386,9 +386,8 @@ impl WindowService { ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, dumped_slots_receiver: DumpedSlotsReceiver, popular_pruned_forks_sender: PopularPrunedForksSender, + outstanding_repair_requests: Arc>, ) -> WindowService { - let outstanding_requests = Arc::>::default(); - let cluster_info = repair_info.cluster_info.clone(); let bank_forks = repair_info.bank_forks.clone(); @@ -401,7 +400,7 @@ impl WindowService { repair_quic_endpoint_response_sender, repair_info, verified_vote_receiver, - outstanding_requests.clone(), + outstanding_repair_requests.clone(), ancestor_hashes_replay_update_receiver, dumped_slots_receiver, popular_pruned_forks_sender, @@ -426,7 +425,7 @@ impl WindowService { duplicate_sender, completed_data_sets_sender, retransmit_sender, - outstanding_requests, + outstanding_repair_requests, ); WindowService { diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 67f2309a9..78ee6a4b3 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -13,6 +13,7 @@ use { solana_core::{ admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::{tower_storage::TowerStorage, Tower}, + repair::repair_service, validator::ValidatorStartProgress, }, solana_geyser_plugin_manager::GeyserPluginManagerRequest, @@ -207,6 +208,15 @@ pub trait AdminRpc { #[rpc(meta, name = "contactInfo")] fn contact_info(&self, meta: Self::Metadata) -> Result; + #[rpc(meta, name = "repairShredFromPeer")] + fn repair_shred_from_peer( + &self, + meta: Self::Metadata, + pubkey: Pubkey, + slot: u64, + shred_index: u64, + ) -> Result<()>; + #[rpc(meta, name = "repairWhitelist")] fn repair_whitelist(&self, meta: Self::Metadata) -> Result; @@ -487,6 +497,28 @@ impl AdminRpc for AdminRpcImpl { meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into())) } + fn repair_shred_from_peer( + &self, + meta: Self::Metadata, + pubkey: Pubkey, + slot: u64, + shred_index: u64, + ) -> Result<()> { + debug!("repair_shred_from_peer request received"); + + meta.with_post_init(|post_init| { + repair_service::RepairService::request_repair_for_shred_from_peer( + post_init.cluster_info.clone(), + pubkey, + slot, + shred_index, + &post_init.repair_socket.clone(), + post_init.outstanding_repair_requests.clone(), + ); + Ok(()) + }) + } + fn repair_whitelist(&self, meta: Self::Metadata) -> Result { debug!("repair_whitelist request received"); @@ -895,6 +927,10 @@ mod tests { vote_account, repair_whitelist, notifies: Vec::new(), + repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()), + outstanding_repair_requests: Arc::< + RwLock, + >::default(), }))), staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())), rpc_to_plugin_manager_sender: None, diff --git a/validator/src/cli.rs b/validator/src/cli.rs index d065a3524..892398cc7 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1497,6 +1497,33 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .help("Output display mode") ) ) + .subcommand(SubCommand::with_name("repair-shred-from-peer") + .about("Request a repair from the specified validator") + .arg( + Arg::with_name("pubkey") + .long("pubkey") + .value_name("PUBKEY") + .takes_value(true) + .validator(is_pubkey) + .help("Identity pubkey of the validator to repair from") + ) + .arg( + Arg::with_name("slot") + .long("slot") + .value_name("SLOT") + .takes_value(true) + .validator(is_parsable::) + .help("Slot to repair") + ) + .arg( + Arg::with_name("shred") + .long("shred") + .value_name("SHRED") + .takes_value(true) + .validator(is_parsable::) + .help("Shred to repair") + ) + ) .subcommand( SubCommand::with_name("repair-whitelist") .about("Manage the validator's repair protocol whitelist") diff --git a/validator/src/main.rs b/validator/src/main.rs index 902a73df4..781228e9b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -793,6 +793,24 @@ pub fn main() { }); return; } + ("repair-shred-from-peer", Some(subcommand_matches)) => { + let pubkey = value_t_or_exit!(subcommand_matches, "pubkey", Pubkey); + let slot = value_t_or_exit!(subcommand_matches, "slot", u64); + let shred_index = value_t_or_exit!(subcommand_matches, "shred", u64); + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .repair_shred_from_peer(pubkey, slot, shred_index) + .await + }) + .unwrap_or_else(|err| { + println!("repair shred from peer failed: {err}"); + exit(1); + }); + return; + } ("repair-whitelist", Some(repair_whitelist_subcommand_matches)) => { match repair_whitelist_subcommand_matches.subcommand() { ("get", Some(subcommand_matches)) => {