diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 9804ba8be2..f07960d9f7 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -21,6 +21,7 @@ use { solana_sdk::{ clock::{Slot, SLOT_MS}, pubkey::Pubkey, + signer::keypair::Keypair, timing::timestamp, }, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, @@ -451,7 +452,10 @@ impl AncestorHashesService { ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, retryable_slots_receiver: RetryableSlotsReceiver, ) -> JoinHandle<()> { - let serve_repair = ServeRepair::new(repair_info.cluster_info.clone()); + let serve_repair = ServeRepair::new( + repair_info.cluster_info.clone(), + repair_info.bank_forks.clone(), + ); let mut repair_stats = AncestorRepairRequestsStats::default(); let mut dead_slot_pool = HashSet::new(); @@ -540,6 +544,8 @@ impl AncestorHashesService { // Keep around the last second of requests in the throttler. request_throttle.retain(|request_time| *request_time > (timestamp() - 1000)); + let identity_keypair: &Keypair = &repair_info.cluster_info.keypair().clone(); + let number_of_allowed_requests = MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND.saturating_sub(request_throttle.len()); @@ -563,6 +569,8 @@ impl AncestorHashesService { slot, repair_stats, outstanding_requests, + identity_keypair, + &root_bank, ) { request_throttle.push(timestamp()); repairable_dead_slot_pool.take(&slot).unwrap(); @@ -627,6 +635,7 @@ impl AncestorHashesService { /// Returns true if a request was successfully made and the status /// added to `ancestor_hashes_request_statuses` + #[allow(clippy::too_many_arguments)] fn initiate_ancestor_hashes_requests_for_duplicate_slot( ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, @@ -636,6 +645,8 @@ impl AncestorHashesService { duplicate_slot: Slot, repair_stats: &mut AncestorRepairRequestsStats, outstanding_requests: &RwLock, + identity_keypair: &Keypair, + root_bank: &Bank, ) -> bool { let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers( duplicate_slot, @@ -652,8 +663,13 @@ impl AncestorHashesService { .write() .unwrap() .add_request(AncestorHashesRepairType(duplicate_slot), timestamp()); - let request_bytes = - serve_repair.ancestor_repair_request_bytes(duplicate_slot, nonce); + let request_bytes = serve_repair.ancestor_repair_request_bytes( + identity_keypair, + root_bank, + pubkey, + duplicate_slot, + nonce, + ); if let Ok(request_bytes) = request_bytes { let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr); } @@ -877,14 +893,17 @@ mod test { fn new(slot_to_query: Slot) -> Self { assert!(slot_to_query >= MAX_ANCESTOR_RESPONSES as Slot); + let vote_simulator = VoteSimulator::new(3); let responder_node = Node::new_localhost(); let cluster_info = ClusterInfo::new( responder_node.info.clone(), Arc::new(Keypair::new()), SocketAddrSpace::Unspecified, ); - let responder_serve_repair = - Arc::new(RwLock::new(ServeRepair::new(Arc::new(cluster_info)))); + let responder_serve_repair = Arc::new(RwLock::new(ServeRepair::new( + Arc::new(cluster_info), + vote_simulator.bank_forks, + ))); // Set up thread to give us responses let ledger_path = get_tmp_ledger_path!(); @@ -968,7 +987,8 @@ mod test { Arc::new(Keypair::new()), SocketAddrSpace::Unspecified, )); - let requester_serve_repair = ServeRepair::new(requester_cluster_info.clone()); + let requester_serve_repair = + ServeRepair::new(requester_cluster_info.clone(), bank_forks.clone()); let (duplicate_slots_reset_sender, _duplicate_slots_reset_receiver) = unbounded(); let repair_info = RepairInfo { bank_forks, @@ -1074,6 +1094,7 @@ mod test { } = ManageAncestorHashesState::new(vote_simulator.bank_forks); let RepairInfo { + bank_forks, cluster_info: requester_cluster_info, cluster_slots, repair_validators, @@ -1089,6 +1110,8 @@ mod test { dead_slot, &mut repair_stats, &outstanding_requests, + &requester_cluster_info.keypair(), + &bank_forks.read().unwrap().root_bank(), ); assert!(ancestor_hashes_request_statuses.is_empty()); @@ -1106,6 +1129,8 @@ mod test { dead_slot, &mut repair_stats, &outstanding_requests, + &requester_cluster_info.keypair(), + &bank_forks.read().unwrap().root_bank(), ); assert_eq!(ancestor_hashes_request_statuses.len(), 1); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index b3e8d5b479..018824c793 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -17,7 +17,10 @@ use { solana_ledger::blockstore::{Blockstore, SlotMeta}, solana_measure::measure::Measure, solana_runtime::{bank_forks::BankForks, contains::Contains}, - solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey}, + solana_sdk::{ + clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, + signer::keypair::Keypair, + }, solana_streamer::sendmmsg::{batch_send, SendPktsError}, std::{ collections::{HashMap, HashSet}, @@ -246,7 +249,10 @@ impl RepairService { outstanding_requests: &RwLock, ) { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); - let serve_repair = ServeRepair::new(repair_info.cluster_info.clone()); + let serve_repair = ServeRepair::new( + repair_info.cluster_info.clone(), + repair_info.bank_forks.clone(), + ); let id = repair_info.cluster_info.id(); let mut repair_stats = RepairStats::default(); let mut repair_timing = RepairTiming::default(); @@ -265,8 +271,11 @@ impl RepairService { let mut get_votes_elapsed; let mut add_votes_elapsed; + let root_bank = repair_info.bank_forks.read().unwrap().root_bank(); + let sign_repair_requests_feature_epoch = + ServeRepair::sign_repair_requests_activated_epoch(&root_bank); + let repairs = { - let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone(); let new_root = root_bank.slot(); // Purge outdated slots from the weighting heuristic @@ -314,12 +323,24 @@ impl RepairService { repairs }; + let identity_keypair: &Keypair = &repair_info.cluster_info.keypair().clone(); + let mut build_repairs_batch_elapsed = Measure::start("build_repairs_batch_elapsed"); let batch: Vec<(Vec, SocketAddr)> = { let mut outstanding_requests = outstanding_requests.write().unwrap(); repairs .iter() .filter_map(|repair_request| { + let sign_repair_request = ServeRepair::should_sign_repair_request( + repair_request.slot(), + &root_bank, + sign_repair_requests_feature_epoch, + ); + let maybe_keypair = if sign_repair_request { + Some(identity_keypair) + } else { + None + }; let (to, req) = serve_repair .repair_request( &repair_info.cluster_slots, @@ -328,6 +349,7 @@ impl RepairService { &mut repair_stats, &repair_info.repair_validators, &mut outstanding_requests, + maybe_keypair, ) .ok()?; Some((req, to)) @@ -653,8 +675,13 @@ impl RepairService { repair_stats: &mut RepairStats, nonce: Nonce, ) -> Result<()> { - let req = - serve_repair.map_repair_request(repair_type, repair_pubkey, repair_stats, nonce)?; + let req = serve_repair.map_repair_request( + repair_type, + repair_pubkey, + repair_stats, + nonce, + None, + )?; repair_socket.send_to(&req, to)?; Ok(()) } @@ -722,9 +749,11 @@ mod test { blockstore::{ make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, Blockstore, }, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, shred::max_ticks_per_n_shreds, }, + solana_runtime::bank::Bank, solana_sdk::signature::Keypair, solana_streamer::socket::SocketAddrSpace, std::collections::HashSet, @@ -1044,11 +1073,16 @@ mod test { #[test] pub fn test_generate_and_send_duplicate_repairs() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let blockstore_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&blockstore_path).unwrap(); let cluster_slots = ClusterSlots::default(); - let serve_repair = - ServeRepair::new(Arc::new(new_test_cluster_info(Node::new_localhost().info))); + let serve_repair = ServeRepair::new( + Arc::new(new_test_cluster_info(Node::new_localhost().info)), + bank_forks, + ); let mut duplicate_slot_repair_statuses = HashMap::new(); let dead_slot = 9; let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1127,12 +1161,15 @@ mod test { #[test] pub fn test_update_duplicate_slot_repair_addr() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let dummy_addr = Some(( Pubkey::default(), UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(), )); let cluster_info = Arc::new(new_test_cluster_info(Node::new_localhost().info)); - let serve_repair = ServeRepair::new(cluster_info.clone()); + let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks); let valid_repair_peer = Node::new_localhost().info; // Signal that this peer has confirmed the dead slot, and is thus diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 5271aff0fa..9020d70b5a 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -16,25 +16,39 @@ use { solana_gossip::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::ContactInfo, + ping_pong::{self, PingCache, Pong}, weighted_shuffle::WeightedShuffle, }, solana_ledger::{ ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, blockstore::Blockstore, - shred::{Nonce, Shred, SIZE_OF_NONCE}, + shred::{Nonce, Shred, ShredFetchStats, SIZE_OF_NONCE}, }, solana_metrics::inc_new_counter_debug, solana_perf::{ data_budget::DataBudget, - packet::{PacketBatch, PacketBatchRecycler}, + packet::{Packet, PacketBatch, PacketBatchRecycler}, }, + solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::Slot, hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms, + clock::Slot, + feature_set::sign_repair_requests, + hash::{Hash, HASH_BYTES}, + packet::PACKET_DATA_SIZE, + pubkey::{Pubkey, PUBKEY_BYTES}, + signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, + signer::keypair::Keypair, + stake_history::Epoch, + timing::{duration_as_ms, timestamp}, + }, + solana_streamer::{ + sendmmsg::{batch_send, SendPktsError}, + socket::SocketAddrSpace, + streamer::{PacketBatchReceiver, PacketBatchSender}, }, - solana_streamer::streamer::{PacketBatchReceiver, PacketBatchSender}, std::{ collections::HashSet, - net::SocketAddr, + net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock, @@ -59,6 +73,14 @@ pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize = 4 /*slot_hash length*/; pub const MAX_ANCESTOR_RESPONSES: usize = MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::(); +/// Number of bytes in the randomly generated token sent with ping messages. +pub(crate) const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES; +pub const REPAIR_PING_CACHE_CAPACITY: usize = 65536; +pub const REPAIR_PING_CACHE_TTL: Duration = Duration::from_secs(1280); +pub(crate) const REPAIR_RESPONSE_SERIALIZED_PING_BYTES: usize = + 4 /*enum discriminator*/ + PUBKEY_BYTES + REPAIR_PING_TOKEN_SIZE + SIGNATURE_BYTES; +const SIGNED_REPAIR_TIME_WINDOW: Duration = Duration::from_secs(60 * 10); // 10 min + #[cfg(test)] static_assertions::const_assert_eq!(MAX_ANCESTOR_RESPONSES, 30); @@ -143,35 +165,143 @@ impl RequestResponse for AncestorHashesRepairType { } #[derive(Default)] -pub struct ServeRepairStats { - pub total_requests: usize, - pub dropped_requests: usize, - pub total_dropped_response_packets: usize, - pub total_response_packets: usize, - pub total_response_bytes: usize, - pub processed: usize, - pub self_repair: usize, - pub window_index: usize, - pub highest_window_index: usize, - pub orphan: usize, - pub ancestor_hashes: usize, +struct ServeRepairStats { + total_requests: usize, + dropped_requests: usize, + total_dropped_response_packets: usize, + total_response_packets: usize, + total_response_bytes: usize, + processed: usize, + self_repair: usize, + window_index: usize, + highest_window_index: usize, + orphan: usize, + pong: usize, + ancestor_hashes: usize, + pings_required: usize, + err_time_skew: usize, + err_malformed: usize, + err_sig_verify: usize, + err_unsigned: usize, + err_id_mismatch: usize, } +#[derive(Debug, Serialize, Deserialize)] +pub struct RepairRequestHeader { + signature: Signature, + sender: Pubkey, + recipient: Pubkey, + timestamp: u64, + nonce: Nonce, +} + +impl RepairRequestHeader { + pub fn new(sender: Pubkey, recipient: Pubkey, timestamp: u64, nonce: Nonce) -> Self { + Self { + signature: Signature::default(), + sender, + recipient, + timestamp, + nonce, + } + } +} + +pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>; + /// Window protocol messages #[derive(Serialize, Deserialize, Debug)] pub enum RepairProtocol { - WindowIndex(ContactInfo, Slot, u64), - HighestWindowIndex(ContactInfo, Slot, u64), - Orphan(ContactInfo, Slot), - WindowIndexWithNonce(ContactInfo, Slot, u64, Nonce), - HighestWindowIndexWithNonce(ContactInfo, Slot, u64, Nonce), - OrphanWithNonce(ContactInfo, Slot, Nonce), - AncestorHashes(ContactInfo, Slot, Nonce), + LegacyWindowIndex(ContactInfo, Slot, u64), + LegacyHighestWindowIndex(ContactInfo, Slot, u64), + LegacyOrphan(ContactInfo, Slot), + LegacyWindowIndexWithNonce(ContactInfo, Slot, u64, Nonce), + LegacyHighestWindowIndexWithNonce(ContactInfo, Slot, u64, Nonce), + LegacyOrphanWithNonce(ContactInfo, Slot, Nonce), + LegacyAncestorHashes(ContactInfo, Slot, Nonce), + Pong(ping_pong::Pong), + WindowIndex { + header: RepairRequestHeader, + slot: Slot, + shred_index: u64, + }, + HighestWindowIndex { + header: RepairRequestHeader, + slot: Slot, + shred_index: u64, + }, + Orphan { + header: RepairRequestHeader, + slot: Slot, + }, + AncestorHashes { + header: RepairRequestHeader, + slot: Slot, + }, +} + +#[derive(Serialize, Deserialize, Debug)] +enum RepairResponse { + Ping(Ping), +} + +impl RepairProtocol { + fn sender(&self) -> &Pubkey { + match self { + Self::LegacyWindowIndex(ci, _, _) => &ci.id, + Self::LegacyHighestWindowIndex(ci, _, _) => &ci.id, + Self::LegacyOrphan(ci, _) => &ci.id, + Self::LegacyWindowIndexWithNonce(ci, _, _, _) => &ci.id, + Self::LegacyHighestWindowIndexWithNonce(ci, _, _, _) => &ci.id, + Self::LegacyOrphanWithNonce(ci, _, _) => &ci.id, + Self::LegacyAncestorHashes(ci, _, _) => &ci.id, + Self::Pong(pong) => pong.from(), + Self::WindowIndex { header, .. } => &header.sender, + Self::HighestWindowIndex { header, .. } => &header.sender, + Self::Orphan { header, .. } => &header.sender, + Self::AncestorHashes { header, .. } => &header.sender, + } + } + + fn supports_signature(&self) -> bool { + match self { + Self::LegacyWindowIndex(_, _, _) + | Self::LegacyHighestWindowIndex(_, _, _) + | Self::LegacyOrphan(_, _) + | Self::LegacyWindowIndexWithNonce(_, _, _, _) + | Self::LegacyHighestWindowIndexWithNonce(_, _, _, _) + | Self::LegacyOrphanWithNonce(_, _, _) + | Self::LegacyAncestorHashes(_, _, _) => false, + Self::Pong(_) + | Self::WindowIndex { .. } + | Self::HighestWindowIndex { .. } + | Self::Orphan { .. } + | Self::AncestorHashes { .. } => true, + } + } + + fn requires_ping_check(&self) -> bool { + match self { + Self::LegacyWindowIndex(_, _, _) + | Self::LegacyHighestWindowIndex(_, _, _) + | Self::LegacyOrphan(_, _) + | Self::LegacyWindowIndexWithNonce(_, _, _, _) + | Self::LegacyHighestWindowIndexWithNonce(_, _, _, _) + | Self::LegacyOrphanWithNonce(_, _, _) + | Self::LegacyAncestorHashes(_, _, _) + | Self::Pong(_) + | Self::AncestorHashes { .. } => false, + Self::WindowIndex { .. } | Self::HighestWindowIndex { .. } | Self::Orphan { .. } => { + true + } + } + } } #[derive(Clone)] pub struct ServeRepair { cluster_info: Arc, + bank_forks: Arc>, } // Cache entry for repair peers for a slot. @@ -208,8 +338,11 @@ impl RepairPeers { } impl ServeRepair { - pub fn new(cluster_info: Arc) -> Self { - Self { cluster_info } + pub fn new(cluster_info: Arc, bank_forks: Arc>) -> Self { + Self { + cluster_info, + bank_forks, + } } fn my_info(&self) -> ContactInfo { @@ -220,47 +353,29 @@ impl ServeRepair { self.cluster_info.id() } - fn get_repair_sender(request: &RepairProtocol) -> &ContactInfo { - match request { - RepairProtocol::WindowIndex(ref from, _, _) => from, - RepairProtocol::HighestWindowIndex(ref from, _, _) => from, - RepairProtocol::Orphan(ref from, _) => from, - RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from, - RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from, - RepairProtocol::OrphanWithNonce(ref from, _, _) => from, - RepairProtocol::AncestorHashes(ref from, _, _) => from, - } - } - fn handle_repair( - me: &Arc>, recycler: &PacketBatchRecycler, from_addr: &SocketAddr, blockstore: Option<&Arc>, request: RepairProtocol, stats: &mut ServeRepairStats, + ping_cache: &mut PingCache, ) -> Option { let now = Instant::now(); - - let my_id = me.read().unwrap().my_id(); - //TODO: verify `from` is signed - let from = Self::get_repair_sender(&request); - if from.id == my_id { - stats.self_repair += 1; - return None; - } - let (res, label) = { match &request { - RepairProtocol::WindowIndexWithNonce(_, slot, shred_index, nonce) => { + RepairProtocol::WindowIndex { + header: RepairRequestHeader { nonce, .. }, + slot, + shred_index, + } + | RepairProtocol::LegacyWindowIndexWithNonce(_, slot, shred_index, nonce) => { stats.window_index += 1; ( Self::run_window_request( recycler, - from, from_addr, blockstore, - &my_id, *slot, *shred_index, *nonce, @@ -268,7 +383,17 @@ impl ServeRepair { "WindowIndexWithNonce", ) } - RepairProtocol::HighestWindowIndexWithNonce(_, slot, highest_index, nonce) => { + RepairProtocol::HighestWindowIndex { + header: RepairRequestHeader { nonce, .. }, + slot, + shred_index: highest_index, + } + | RepairProtocol::LegacyHighestWindowIndexWithNonce( + _, + slot, + highest_index, + nonce, + ) => { stats.highest_window_index += 1; ( Self::run_highest_window_request( @@ -282,7 +407,11 @@ impl ServeRepair { "HighestWindowIndexWithNonce", ) } - RepairProtocol::OrphanWithNonce(_, slot, nonce) => { + RepairProtocol::Orphan { + header: RepairRequestHeader { nonce, .. }, + slot, + } + | RepairProtocol::LegacyOrphanWithNonce(_, slot, nonce) => { stats.orphan += 1; ( Self::run_orphan( @@ -296,18 +425,27 @@ impl ServeRepair { "OrphanWithNonce", ) } - RepairProtocol::AncestorHashes(_, slot, nonce) => { + RepairProtocol::AncestorHashes { + header: RepairRequestHeader { nonce, .. }, + slot, + } + | RepairProtocol::LegacyAncestorHashes(_, slot, nonce) => { stats.ancestor_hashes += 1; ( Self::run_ancestor_hashes(recycler, from_addr, blockstore, *slot, *nonce), "AncestorHashes", ) } - _ => (None, "Unsupported repair type"), + RepairProtocol::Pong(pong) => { + stats.pong += 1; + ping_cache.add(pong, *from_addr, Instant::now()); + (None, "Pong") + } + RepairProtocol::LegacyWindowIndex(_, _, _) + | RepairProtocol::LegacyHighestWindowIndex(_, _, _) + | RepairProtocol::LegacyOrphan(_, _) => (None, "Unsupported repair type"), } }; - - trace!("{}: received repair request: {:?}", my_id, request); Self::report_time_spent(label, &now.elapsed(), ""); res } @@ -319,9 +457,28 @@ impl ServeRepair { } } + pub(crate) fn sign_repair_requests_activated_epoch(root_bank: &Bank) -> Option { + root_bank + .feature_set + .activated_slot(&sign_repair_requests::id()) + .map(|slot| root_bank.epoch_schedule().get_epoch(slot)) + } + + pub(crate) fn should_sign_repair_request( + slot: Slot, + root_bank: &Bank, + sign_repairs_epoch: Option, + ) -> bool { + match sign_repairs_epoch { + None => false, + Some(feature_epoch) => feature_epoch < root_bank.epoch_schedule().get_epoch(slot), + } + } + /// Process messages from the network fn run_listen( obj: &Arc>, + ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, blockstore: Option<&Arc>, requests_receiver: &PacketBatchReceiver, @@ -348,13 +505,16 @@ impl ServeRepair { stats.dropped_requests += dropped_requests; stats.total_requests += total_requests; + let root_bank = obj.read().unwrap().bank_forks.read().unwrap().root_bank(); for reqs in reqs_v { Self::handle_packets( obj, + ping_cache, recycler, blockstore, reqs, response_sender, + &root_bank, stats, data_budget, ); @@ -396,6 +556,13 @@ impl ServeRepair { stats.ancestor_hashes, i64 ), + ("pong", stats.pong, i64), + ("pings_required", stats.pings_required, i64), + ("err_time_skew", stats.err_time_skew, i64), + ("err_malformed", stats.err_malformed, i64), + ("err_sig_verify", stats.err_sig_verify, i64), + ("err_unsigned", stats.err_unsigned, i64), + ("err_id_mismatch", stats.err_id_mismatch, i64), ); *stats = ServeRepairStats::default(); @@ -412,6 +579,8 @@ impl ServeRepair { const MAX_BYTES_PER_SECOND: usize = 12_000_000; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; + let mut ping_cache = PingCache::new(REPAIR_PING_CACHE_TTL, REPAIR_PING_CACHE_CAPACITY); + let exit = exit.clone(); let recycler = PacketBatchRecycler::default(); Builder::new() @@ -423,6 +592,7 @@ impl ServeRepair { loop { let result = Self::run_listen( &me, + &mut ping_cache, &recycler, blockstore.as_ref(), &requests_receiver, @@ -447,77 +617,249 @@ impl ServeRepair { .unwrap() } + fn verify_signed_packet( + my_id: &Pubkey, + packet: &Packet, + request: &RepairProtocol, + stats: &mut ServeRepairStats, + ) -> bool { + match request { + RepairProtocol::LegacyWindowIndex(_, _, _) + | RepairProtocol::LegacyHighestWindowIndex(_, _, _) + | RepairProtocol::LegacyOrphan(_, _) + | RepairProtocol::LegacyWindowIndexWithNonce(_, _, _, _) + | RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _) + | RepairProtocol::LegacyOrphanWithNonce(_, _, _) + | RepairProtocol::LegacyAncestorHashes(_, _, _) => { + debug_assert!(false); // expecting only signed request types + stats.err_unsigned += 1; + return false; + } + RepairProtocol::Pong(pong) => { + if !pong.verify() { + stats.err_sig_verify += 1; + return false; + } + } + RepairProtocol::WindowIndex { header, .. } + | RepairProtocol::HighestWindowIndex { header, .. } + | RepairProtocol::Orphan { header, .. } + | RepairProtocol::AncestorHashes { header, .. } => { + if &header.recipient != my_id { + stats.err_id_mismatch += 1; + return false; + } + let time_diff_ms = timestamp().abs_diff(header.timestamp); + if u128::from(time_diff_ms) > SIGNED_REPAIR_TIME_WINDOW.as_millis() { + stats.err_time_skew += 1; + return false; + } + let leading_buf = match packet.data(..4) { + Some(buf) => buf, + None => { + debug_assert!(false); // should have failed deserialize + stats.err_malformed += 1; + return false; + } + }; + let trailing_buf = match packet.data(4 + SIGNATURE_BYTES..) { + Some(buf) => buf, + None => { + debug_assert!(false); // should have failed deserialize + stats.err_malformed += 1; + return false; + } + }; + let from_id = request.sender(); + let signed_data = [leading_buf, trailing_buf].concat(); + if !header.signature.verify(from_id.as_ref(), &signed_data) { + stats.err_sig_verify += 1; + return false; + } + } + } + true + } + + fn check_ping_cache( + request: &RepairProtocol, + from_addr: &SocketAddr, + identity_keypair: &Keypair, + socket_addr_space: &SocketAddrSpace, + ping_cache: &mut PingCache, + pending_pings: &mut Vec<(SocketAddr, Ping)>, + stats: &mut ServeRepairStats, + ) -> bool { + if !ContactInfo::is_valid_address(from_addr, socket_addr_space) { + stats.err_malformed += 1; + return false; + } + let mut rng = rand::thread_rng(); + let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok(); + let (check, ping) = + ping_cache.check(Instant::now(), (*request.sender(), *from_addr), &mut pingf); + if let Some(ping) = ping { + pending_pings.push((*from_addr, ping)); + } + if !check { + stats.pings_required += 1; + } + check + } + + fn requires_signature_check( + request: &RepairProtocol, + root_bank: &Bank, + sign_repairs_epoch: Option, + ) -> bool { + match request { + RepairProtocol::LegacyWindowIndex(_, slot, _) + | RepairProtocol::LegacyHighestWindowIndex(_, slot, _) + | RepairProtocol::LegacyOrphan(_, slot) + | RepairProtocol::LegacyWindowIndexWithNonce(_, slot, _, _) + | RepairProtocol::LegacyHighestWindowIndexWithNonce(_, slot, _, _) + | RepairProtocol::LegacyOrphanWithNonce(_, slot, _) + | RepairProtocol::LegacyAncestorHashes(_, slot, _) + | RepairProtocol::WindowIndex { slot, .. } + | RepairProtocol::HighestWindowIndex { slot, .. } + | RepairProtocol::Orphan { slot, .. } + | RepairProtocol::AncestorHashes { slot, .. } => { + Self::should_sign_repair_request(*slot, root_bank, sign_repairs_epoch) + } + RepairProtocol::Pong(_) => true, + } + } + fn handle_packets( me: &Arc>, + ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, blockstore: Option<&Arc>, packet_batch: PacketBatch, response_sender: &PacketBatchSender, + root_bank: &Bank, stats: &mut ServeRepairStats, data_budget: &DataBudget, ) { + let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank); + let (identity_keypair, socket_addr_space) = { + let me_r = me.read().unwrap(); + let keypair = me_r.cluster_info.keypair().clone(); + let socket_addr_space = *me_r.cluster_info.socket_addr_space(); + (keypair, socket_addr_space) + }; + let my_id = identity_keypair.pubkey(); + let mut pending_pings = Vec::default(); + // iter over the packets for (i, packet) in packet_batch.iter().enumerate() { - if let Ok(request) = packet.deserialize_slice(..) { - stats.processed += 1; - let from_addr = packet.meta.socket_addr(); - let rsp = - match Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats) - { - None => continue, - Some(rsp) => rsp, - }; - let num_response_packets = rsp.len(); - let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum(); - if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() { - stats.total_response_bytes += num_response_bytes; - stats.total_response_packets += num_response_packets; - } else { - stats.dropped_requests += packet_batch.len() - i; - stats.total_dropped_response_packets += num_response_packets; - break; + let request: RepairProtocol = match packet.deserialize_slice(..) { + Ok(request) => request, + Err(_) => { + stats.err_malformed += 1; + continue; } + }; + + if request.sender() == &my_id { + stats.self_repair += 1; + continue; + } + + let require_signature_check = + Self::requires_signature_check(&request, root_bank, sign_repairs_epoch); + if require_signature_check && !request.supports_signature() { + stats.err_unsigned += 1; + continue; + } + if request.supports_signature() + && !Self::verify_signed_packet(&my_id, packet, &request, stats) + { + continue; + } + + let from_addr = packet.meta.socket_addr(); + if request.requires_ping_check() + && !Self::check_ping_cache( + &request, + &from_addr, + &identity_keypair, + &socket_addr_space, + ping_cache, + &mut pending_pings, + stats, + ) + { + continue; + } + + stats.processed += 1; + let rsp = match Self::handle_repair( + recycler, &from_addr, blockstore, request, stats, ping_cache, + ) { + None => continue, + Some(rsp) => rsp, + }; + let num_response_packets = rsp.len(); + let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum(); + if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() { + stats.total_response_bytes += num_response_bytes; + stats.total_response_packets += num_response_packets; + } else { + stats.dropped_requests += packet_batch.len() - i; + stats.total_dropped_response_packets += num_response_packets; + break; } } - } - fn window_index_request_bytes( - &self, - slot: Slot, - shred_index: u64, - nonce: Nonce, - ) -> Result> { - let req = RepairProtocol::WindowIndexWithNonce(self.my_info(), slot, shred_index, nonce); - let out = serialize(&req)?; - Ok(out) - } - - fn window_highest_index_request_bytes( - &self, - slot: Slot, - shred_index: u64, - nonce: Nonce, - ) -> Result> { - let req = - RepairProtocol::HighestWindowIndexWithNonce(self.my_info(), slot, shred_index, nonce); - let out = serialize(&req)?; - Ok(out) - } - - fn orphan_bytes(&self, slot: Slot, nonce: Nonce) -> Result> { - let req = RepairProtocol::OrphanWithNonce(self.my_info(), slot, nonce); - let out = serialize(&req)?; - Ok(out) + if !pending_pings.is_empty() { + let packets: Vec<_> = pending_pings + .into_iter() + .filter_map(|(sockaddr, ping)| { + let ping = RepairResponse::Ping(ping); + Packet::from_data(Some(&sockaddr), ping).ok() + }) + .collect(); + let batch = PacketBatch::new(packets); + let _ = response_sender.send(batch); + } } pub fn ancestor_repair_request_bytes( &self, + keypair: &Keypair, + root_bank: &Bank, + repair_peer_id: &Pubkey, request_slot: Slot, nonce: Nonce, ) -> Result> { - let repair_request = RepairProtocol::AncestorHashes(self.my_info(), request_slot, nonce); - let out = serialize(&repair_request)?; - Ok(out) + let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank); + let require_sig = + Self::should_sign_repair_request(request_slot, root_bank, sign_repairs_epoch); + + let (request_proto, maybe_keypair) = if require_sig { + let header = RepairRequestHeader { + signature: Signature::default(), + sender: self.my_id(), + recipient: *repair_peer_id, + timestamp: timestamp(), + nonce, + }; + ( + RepairProtocol::AncestorHashes { + header, + slot: request_slot, + }, + Some(keypair), + ) + } else { + ( + RepairProtocol::LegacyAncestorHashes(self.my_info(), request_slot, nonce), + None, + ) + }; + + Self::repair_proto_to_bytes(&request_proto, maybe_keypair) } pub(crate) fn repair_request( @@ -528,6 +870,7 @@ impl ServeRepair { repair_stats: &mut RepairStats, repair_validators: &Option>, outstanding_requests: &mut OutstandingShredRepairs, + identity_keypair: Option<&Keypair>, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location @@ -544,13 +887,18 @@ impl ServeRepair { } }; let (peer, addr) = repair_peers.sample(&mut rand::thread_rng()); - let nonce = - outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp()); - let out = self.map_repair_request(&repair_request, &peer, repair_stats, nonce)?; + let nonce = outstanding_requests.add_request(repair_request, timestamp()); + let out = self.map_repair_request( + &repair_request, + &peer, + repair_stats, + nonce, + identity_keypair, + )?; Ok((addr, out)) } - pub fn repair_request_ancestor_hashes_sample_peers( + pub(crate) fn repair_request_ancestor_hashes_sample_peers( &self, slot: Slot, cluster_slots: &ClusterSlots, @@ -592,31 +940,134 @@ impl ServeRepair { Ok((repair_peers[n].id, repair_peers[n].serve_repair)) } - pub fn map_repair_request( + pub(crate) fn map_repair_request( &self, repair_request: &ShredRepairType, repair_peer_id: &Pubkey, repair_stats: &mut RepairStats, nonce: Nonce, + identity_keypair: Option<&Keypair>, ) -> Result> { - match repair_request { + let header = if identity_keypair.is_some() { + Some(RepairRequestHeader { + signature: Signature::default(), + sender: self.my_id(), + recipient: *repair_peer_id, + timestamp: timestamp(), + nonce, + }) + } else { + None + }; + let request_proto = match repair_request { ShredRepairType::Shred(slot, shred_index) => { repair_stats .shred .update(repair_peer_id, *slot, *shred_index); - Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?) + if let Some(header) = header { + RepairProtocol::WindowIndex { + header, + slot: *slot, + shred_index: *shred_index, + } + } else { + RepairProtocol::LegacyWindowIndexWithNonce( + self.my_info(), + *slot, + *shred_index, + nonce, + ) + } } ShredRepairType::HighestShred(slot, shred_index) => { repair_stats .highest_shred .update(repair_peer_id, *slot, *shred_index); - Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?) + if let Some(header) = header { + RepairProtocol::HighestWindowIndex { + header, + slot: *slot, + shred_index: *shred_index, + } + } else { + RepairProtocol::LegacyHighestWindowIndexWithNonce( + self.my_info(), + *slot, + *shred_index, + nonce, + ) + } } ShredRepairType::Orphan(slot) => { repair_stats.orphan.update(repair_peer_id, *slot, 0); - Ok(self.orphan_bytes(*slot, nonce)?) + if let Some(header) = header { + RepairProtocol::Orphan { + header, + slot: *slot, + } + } else { + RepairProtocol::LegacyOrphanWithNonce(self.my_info(), *slot, nonce) + } + } + }; + Self::repair_proto_to_bytes(&request_proto, identity_keypair) + } + + /// Distinguish and process `RepairResponse` ping packets ignoring other + /// packets in the batch. + pub(crate) fn handle_repair_response_pings( + repair_socket: &UdpSocket, + keypair: &Keypair, + packet_batch: &mut PacketBatch, + stats: &mut ShredFetchStats, + ) { + let mut pending_pongs = Vec::default(); + for packet in packet_batch.iter_mut() { + if packet.meta.size != REPAIR_RESPONSE_SERIALIZED_PING_BYTES { + continue; + } + if let Ok(RepairResponse::Ping(ping)) = packet.deserialize_slice(..) { + packet.meta.set_discard(true); + if !ping.verify() { + stats.ping_err_verify_count += 1; + continue; + } + stats.ping_count += 1; + if let Ok(pong) = Pong::new(&ping, keypair) { + let pong = RepairProtocol::Pong(pong); + if let Ok(pong_bytes) = serialize(&pong) { + let from_addr = packet.meta.socket_addr(); + pending_pongs.push((pong_bytes, from_addr)); + } + } } } + if !pending_pongs.is_empty() { + if let Err(SendPktsError::IoError(err, num_failed)) = + batch_send(repair_socket, &pending_pongs) + { + warn!( + "batch_send failed to send {}/{} packets. First error: {:?}", + num_failed, + pending_pongs.len(), + err + ); + } + } + } + + pub fn repair_proto_to_bytes( + request: &RepairProtocol, + keypair: Option<&Keypair>, + ) -> Result> { + let mut payload = serialize(&request)?; + if let Some(keypair) = keypair { + debug_assert!(request.supports_signature()); + let signable_data = [&payload[..4], &payload[4 + SIGNATURE_BYTES..]].concat(); + let signature = keypair.sign_message(&signable_data[..]); + payload[4..4 + SIGNATURE_BYTES].copy_from_slice(signature.as_ref()); + } + Ok(payload) } fn repair_peers( @@ -642,10 +1093,8 @@ impl ServeRepair { fn run_window_request( recycler: &PacketBatchRecycler, - from: &ContactInfo, from_addr: &SocketAddr, blockstore: Option<&Arc>, - my_id: &Pubkey, slot: Slot, shred_index: u64, nonce: Nonce, @@ -671,14 +1120,6 @@ impl ServeRepair { } inc_new_counter_debug!("serve_repair-window-request-fail", 1); - trace!( - "{}: failed WindowIndex {} {} {}", - my_id, - from.id, - slot, - shred_index, - ); - None } @@ -797,14 +1238,363 @@ mod tests { solana_ledger::{ blockstore::make_many_slot_entries, blockstore_processor::fill_blockstore_slot_with_ticks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path, shred::{max_ticks_per_n_shreds, Shred, ShredFlags}, }, - solana_perf::packet::Packet, - solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair, timing::timestamp}, + solana_perf::packet::{deserialize_from_with_limit, Packet}, + solana_runtime::bank::Bank, + solana_sdk::{ + feature_set::FeatureSet, hash::Hash, pubkey::Pubkey, signature::Keypair, + timing::timestamp, + }, solana_streamer::socket::SocketAddrSpace, + std::io::Cursor, }; + #[test] + fn test_serialized_ping_size() { + let mut rng = rand::thread_rng(); + let keypair = Keypair::new(); + let ping = Ping::new_rand(&mut rng, &keypair).unwrap(); + let ping = RepairResponse::Ping(ping); + let pkt = Packet::from_data(None, ping).unwrap(); + assert_eq!(pkt.meta.size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES); + } + + #[test] + fn test_serialize_deserialize_signed_request() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); + let cluster_info = Arc::new(new_test_cluster_info(me)); + let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks); + let keypair = cluster_info.keypair().clone(); + let repair_peer_id = solana_sdk::pubkey::new_rand(); + let repair_request = ShredRepairType::Orphan(123); + + let rsp = serve_repair + .map_repair_request( + &repair_request, + &repair_peer_id, + &mut RepairStats::default(), + 456, + Some(&keypair), + ) + .unwrap(); + + let mut cursor = Cursor::new(&rsp[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), rsp.len() as u64); + if let RepairProtocol::Orphan { header, slot } = deserialized_request { + assert_eq!(slot, 123); + assert_eq!(header.nonce, 456); + assert_eq!(&header.sender, &serve_repair.my_id()); + assert_eq!(&header.recipient, &repair_peer_id); + let signed_data = [&rsp[..4], &rsp[4 + SIGNATURE_BYTES..]].concat(); + assert!(header + .signature + .verify(keypair.pubkey().as_ref(), &signed_data)); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + } + + #[test] + fn test_serialize_deserialize_ancestor_hashes_request() { + let slot = 50; + let nonce = 70; + let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); + let cluster_info = Arc::new(new_test_cluster_info(me)); + let repair_peer_id = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let keypair = cluster_info.keypair().clone(); + + let mut bank = Bank::new_for_tests(&genesis_config); + bank.feature_set = Arc::new(FeatureSet::all_enabled()); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone()); + + let root_bank = bank_forks.read().unwrap().root_bank(); + let request_bytes = serve_repair + .ancestor_repair_request_bytes(&keypair, &root_bank, &repair_peer_id, slot, nonce) + .unwrap(); + let mut cursor = Cursor::new(&request_bytes[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), request_bytes.len() as u64); + if let RepairProtocol::AncestorHashes { + header, + slot: deserialized_slot, + } = deserialized_request + { + assert_eq!(deserialized_slot, slot); + assert_eq!(header.nonce, nonce); + assert_eq!(&header.sender, &serve_repair.my_id()); + assert_eq!(&header.recipient, &repair_peer_id); + let signed_data = [&request_bytes[..4], &request_bytes[4 + SIGNATURE_BYTES..]].concat(); + assert!(header + .signature + .verify(keypair.pubkey().as_ref(), &signed_data)); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + + let mut bank = Bank::new_for_tests(&genesis_config); + let mut feature_set = FeatureSet::all_enabled(); + feature_set.deactivate(&sign_repair_requests::id()); + bank.feature_set = Arc::new(feature_set); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let serve_repair = ServeRepair::new(cluster_info, bank_forks.clone()); + + let root_bank = bank_forks.read().unwrap().root_bank(); + let request_bytes = serve_repair + .ancestor_repair_request_bytes(&keypair, &root_bank, &repair_peer_id, slot, nonce) + .unwrap(); + let mut cursor = Cursor::new(&request_bytes[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), request_bytes.len() as u64); + if let RepairProtocol::LegacyAncestorHashes(ci, deserialized_slot, deserialized_nonce) = + deserialized_request + { + assert_eq!(slot, deserialized_slot); + assert_eq!(nonce, deserialized_nonce); + assert_eq!(&serve_repair.my_id(), &ci.id); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + } + + #[test] + fn test_map_requests_signed_unsigned() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); + let cluster_info = Arc::new(new_test_cluster_info(me)); + let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks); + let keypair = cluster_info.keypair().clone(); + let repair_peer_id = solana_sdk::pubkey::new_rand(); + + let slot = 50; + let shred_index = 60; + let nonce = 70; + + let request = ShredRepairType::Shred(slot, shred_index); + let rsp = serve_repair + .map_repair_request( + &request, + &repair_peer_id, + &mut RepairStats::default(), + nonce, + Some(&keypair), + ) + .unwrap(); + + let mut cursor = Cursor::new(&rsp[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), rsp.len() as u64); + if let RepairProtocol::WindowIndex { + header, + slot: deserialized_slot, + shred_index: deserialized_shred_index, + } = deserialized_request + { + assert_eq!(deserialized_slot, slot); + assert_eq!(deserialized_shred_index, shred_index); + assert_eq!(header.nonce, nonce); + assert_eq!(&header.sender, &serve_repair.my_id()); + assert_eq!(&header.recipient, &repair_peer_id); + let signed_data = [&rsp[..4], &rsp[4 + SIGNATURE_BYTES..]].concat(); + assert!(header + .signature + .verify(keypair.pubkey().as_ref(), &signed_data)); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + + let rsp = serve_repair + .map_repair_request( + &request, + &repair_peer_id, + &mut RepairStats::default(), + nonce, + None, + ) + .unwrap(); + + let mut cursor = Cursor::new(&rsp[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), rsp.len() as u64); + if let RepairProtocol::LegacyWindowIndexWithNonce( + ci, + deserialized_slot, + deserialized_shred_index, + deserialized_nonce, + ) = deserialized_request + { + assert_eq!(slot, deserialized_slot); + assert_eq!(shred_index, deserialized_shred_index); + assert_eq!(nonce, deserialized_nonce); + assert_eq!(&serve_repair.my_id(), &ci.id); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + + let request = ShredRepairType::HighestShred(slot, shred_index); + let rsp = serve_repair + .map_repair_request( + &request, + &repair_peer_id, + &mut RepairStats::default(), + nonce, + Some(&keypair), + ) + .unwrap(); + + let mut cursor = Cursor::new(&rsp[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), rsp.len() as u64); + if let RepairProtocol::HighestWindowIndex { + header, + slot: deserialized_slot, + shred_index: deserialized_shred_index, + } = deserialized_request + { + assert_eq!(deserialized_slot, slot); + assert_eq!(deserialized_shred_index, shred_index); + assert_eq!(header.nonce, nonce); + assert_eq!(&header.sender, &serve_repair.my_id()); + assert_eq!(&header.recipient, &repair_peer_id); + let signed_data = [&rsp[..4], &rsp[4 + SIGNATURE_BYTES..]].concat(); + assert!(header + .signature + .verify(keypair.pubkey().as_ref(), &signed_data)); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + + let rsp = serve_repair + .map_repair_request( + &request, + &repair_peer_id, + &mut RepairStats::default(), + nonce, + None, + ) + .unwrap(); + + let mut cursor = Cursor::new(&rsp[..]); + let deserialized_request: RepairProtocol = + deserialize_from_with_limit(&mut cursor).unwrap(); + assert_eq!(cursor.position(), rsp.len() as u64); + if let RepairProtocol::LegacyHighestWindowIndexWithNonce( + ci, + deserialized_slot, + deserialized_shred_index, + deserialized_nonce, + ) = deserialized_request + { + assert_eq!(slot, deserialized_slot); + assert_eq!(shred_index, deserialized_shred_index); + assert_eq!(nonce, deserialized_nonce); + assert_eq!(&serve_repair.my_id(), &ci.id); + } else { + panic!("unexpected request type {:?}", &deserialized_request); + } + } + + #[test] + fn test_verify_signed_packet() { + let keypair = Keypair::new(); + let other_keypair = Keypair::new(); + let my_id = Pubkey::new_unique(); + let other_id = Pubkey::new_unique(); + + fn sign_packet(packet: &mut Packet, keypair: &Keypair) { + let signable_data = [ + packet.data(..4).unwrap(), + packet.data(4 + SIGNATURE_BYTES..).unwrap(), + ] + .concat(); + let signature = keypair.sign_message(&signable_data[..]); + packet.buffer_mut()[4..4 + SIGNATURE_BYTES].copy_from_slice(signature.as_ref()); + } + + // well formed packet + let packet = { + let header = RepairRequestHeader::new(keypair.pubkey(), my_id, timestamp(), 678); + let slot = 239847; + let request = RepairProtocol::Orphan { header, slot }; + let mut packet = Packet::from_data(None, &request).unwrap(); + sign_packet(&mut packet, &keypair); + packet + }; + let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); + assert!(ServeRepair::verify_signed_packet( + &my_id, + &packet, + &request, + &mut ServeRepairStats::default(), + )); + + // recipient mismatch + let packet = { + let header = RepairRequestHeader::new(keypair.pubkey(), other_id, timestamp(), 678); + let slot = 239847; + let request = RepairProtocol::Orphan { header, slot }; + let mut packet = Packet::from_data(None, &request).unwrap(); + sign_packet(&mut packet, &keypair); + packet + }; + let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); + let mut stats = ServeRepairStats::default(); + assert!(!ServeRepair::verify_signed_packet( + &my_id, &packet, &request, &mut stats, + )); + assert_eq!(stats.err_id_mismatch, 1); + + // outside time window + let packet = { + let time_diff_ms = u64::try_from(SIGNED_REPAIR_TIME_WINDOW.as_millis() * 2).unwrap(); + let old_timestamp = timestamp().saturating_sub(time_diff_ms); + let header = RepairRequestHeader::new(keypair.pubkey(), my_id, old_timestamp, 678); + let slot = 239847; + let request = RepairProtocol::Orphan { header, slot }; + let mut packet = Packet::from_data(None, &request).unwrap(); + sign_packet(&mut packet, &keypair); + packet + }; + let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); + let mut stats = ServeRepairStats::default(); + assert!(!ServeRepair::verify_signed_packet( + &my_id, &packet, &request, &mut stats, + )); + assert_eq!(stats.err_time_skew, 1); + + // bad signature + let packet = { + let header = RepairRequestHeader::new(keypair.pubkey(), my_id, timestamp(), 678); + let slot = 239847; + let request = RepairProtocol::Orphan { header, slot }; + let mut packet = Packet::from_data(None, &request).unwrap(); + sign_packet(&mut packet, &other_keypair); + packet + }; + let request: RepairProtocol = packet.deserialize_slice(..).unwrap(); + let mut stats = ServeRepairStats::default(); + assert!(!ServeRepair::verify_signed_packet( + &my_id, &packet, &request, &mut stats, + )); + assert_eq!(stats.err_sig_verify, 1); + } + #[test] fn test_run_highest_window_request() { run_highest_window_request(5, 3, 9); @@ -886,27 +1676,10 @@ mod tests { let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let me = ContactInfo { - id: solana_sdk::pubkey::new_rand(), - gossip: socketaddr!("127.0.0.1:1234"), - tvu: socketaddr!("127.0.0.1:1235"), - tvu_forwards: socketaddr!("127.0.0.1:1236"), - repair: socketaddr!("127.0.0.1:1237"), - tpu: socketaddr!("127.0.0.1:1238"), - tpu_forwards: socketaddr!("127.0.0.1:1239"), - tpu_vote: socketaddr!("127.0.0.1:1240"), - rpc: socketaddr!("127.0.0.1:1241"), - rpc_pubsub: socketaddr!("127.0.0.1:1242"), - serve_repair: socketaddr!("127.0.0.1:1243"), - wallclock: 0, - shred_version: 0, - }; let rv = ServeRepair::run_window_request( &recycler, - &me, &socketaddr_any!(), Some(&blockstore), - &me.id, slot, 0, nonce, @@ -921,10 +1694,8 @@ mod tests { let index = 1; let rv = ServeRepair::run_window_request( &recycler, - &me, &socketaddr_any!(), Some(&blockstore), - &me.id, slot, index, nonce, @@ -956,10 +1727,13 @@ mod tests { #[test] fn window_index_request() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let cluster_slots = ClusterSlots::default(); let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); let cluster_info = Arc::new(new_test_cluster_info(me)); - let serve_repair = ServeRepair::new(cluster_info.clone()); + let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks); let mut outstanding_requests = OutstandingShredRepairs::default(); let rv = serve_repair.repair_request( &cluster_slots, @@ -968,6 +1742,7 @@ mod tests { &mut RepairStats::default(), &None, &mut outstanding_requests, + None, ); assert_matches!(rv, Err(Error::ClusterInfo(ClusterInfoError::NoPeers))); @@ -996,6 +1771,7 @@ mod tests { &mut RepairStats::default(), &None, &mut outstanding_requests, + None, ) .unwrap(); assert_eq!(nxt.serve_repair, serve_repair_addr); @@ -1030,6 +1806,7 @@ mod tests { &mut RepairStats::default(), &None, &mut outstanding_requests, + None, ) .unwrap(); if rv.0 == serve_repair_addr { @@ -1265,6 +2042,9 @@ mod tests { #[test] fn test_repair_with_repair_validators() { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); let cluster_slots = ClusterSlots::default(); let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); let cluster_info = Arc::new(new_test_cluster_info(me.clone())); @@ -1276,7 +2056,7 @@ mod tests { ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); cluster_info.insert_info(contact_info2.clone()); cluster_info.insert_info(contact_info3.clone()); - let serve_repair = ServeRepair::new(cluster_info); + let serve_repair = ServeRepair::new(cluster_info, bank_forks); // If: // 1) repair validator set doesn't exist in gossip @@ -1293,6 +2073,7 @@ mod tests { &mut RepairStats::default(), &known_validators, &mut OutstandingShredRepairs::default(), + None, ) .is_err()); } @@ -1310,6 +2091,7 @@ mod tests { &mut RepairStats::default(), &known_validators, &mut OutstandingShredRepairs::default(), + None, ) .is_ok()); @@ -1331,6 +2113,7 @@ mod tests { &mut RepairStats::default(), &None, &mut OutstandingShredRepairs::default(), + None, ) .is_ok()); } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 78b5e15b95..ae604e766f 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,9 +1,10 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. use { - crate::packet_hasher::PacketHasher, + crate::{packet_hasher::PacketHasher, serve_repair::ServeRepair}, crossbeam_channel::{unbounded, Sender}, lru::LruCache, + solana_gossip::cluster_info::ClusterInfo, solana_ledger::shred::{should_discard_shred, ShredFetchStats}, solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, solana_runtime::bank_forks::BankForks, @@ -33,10 +34,14 @@ impl ShredFetchStage { shred_version: u16, name: &'static str, flags: PacketFlags, + repair_context: Option<(&UdpSocket, &ClusterInfo)>, ) { const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1); let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE); let mut last_updated = Instant::now(); + let mut keypair = repair_context + .as_ref() + .map(|(_, cluster_info)| cluster_info.keypair().clone()); // In the case of bank_forks=None, setup to accept any slot range let mut last_root = 0; @@ -59,8 +64,25 @@ impl ShredFetchStage { let root_bank = bank_forks_r.root_bank(); slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); } + keypair = repair_context + .as_ref() + .map(|(_, cluster_info)| cluster_info.keypair().clone()); } stats.shred_count += packet_batch.len(); + + if let Some((udp_socket, _)) = repair_context { + debug_assert_eq!(flags, PacketFlags::REPAIR); + debug_assert!(keypair.is_some()); + if let Some(ref keypair) = keypair { + ServeRepair::handle_repair_response_pings( + udp_socket, + keypair, + &mut packet_batch, + &mut stats, + ); + } + } + // Limit shreds to 2 epochs away. let max_slot = last_slot + 2 * slots_per_epoch; for packet in packet_batch.iter_mut() { @@ -94,6 +116,7 @@ impl ShredFetchStage { shred_version: u16, name: &'static str, flags: PacketFlags, + repair_context: Option<(Arc, Arc)>, ) -> (Vec>, JoinHandle<()>) { let (packet_sender, packet_receiver) = unbounded(); let streamers = sockets @@ -111,10 +134,12 @@ impl ShredFetchStage { ) }) .collect(); - let modifier_hdl = Builder::new() .name("solana-tvu-fetch-stage-packet-modifier".to_string()) .spawn(move || { + let repair_context = repair_context + .as_ref() + .map(|(socket, cluster_info)| (socket.as_ref(), cluster_info.as_ref())); Self::modify_packets( packet_receiver, sender, @@ -122,6 +147,7 @@ impl ShredFetchStage { shred_version, name, flags, + repair_context, ) }) .unwrap(); @@ -135,6 +161,7 @@ impl ShredFetchStage { sender: Sender, shred_version: u16, bank_forks: Arc>, + cluster_info: Arc, exit: &Arc, ) -> Self { let recycler = PacketBatchRecycler::warmed(100, 1024); @@ -148,6 +175,7 @@ impl ShredFetchStage { shred_version, "shred_fetch", PacketFlags::empty(), + None, // repair_context ); let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier( @@ -159,10 +187,11 @@ impl ShredFetchStage { shred_version, "shred_fetch_tvu_forwards", PacketFlags::FORWARDED, + None, // repair_context ); let (repair_receiver, repair_handler) = Self::packet_modifier( - vec![repair_socket], + vec![repair_socket.clone()], exit, sender, recycler, @@ -170,6 +199,7 @@ impl ShredFetchStage { shred_version, "shred_fetch_repair", PacketFlags::REPAIR, + Some((repair_socket, cluster_info)), ); tvu_threads.extend(tvu_forwards_threads.into_iter()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9004131c56..990b943f74 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -152,6 +152,7 @@ impl Tvu { fetch_sender, tvu_config.shred_version, bank_forks.clone(), + cluster_info.clone(), exit, ); diff --git a/core/src/validator.rs b/core/src/validator.rs index ef0848ceca..867d1e698f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -883,7 +883,10 @@ impl Validator { Some(stats_reporter_sender.clone()), &exit, ); - let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone()))); + let serve_repair = Arc::new(RwLock::new(ServeRepair::new( + cluster_info.clone(), + bank_forks.clone(), + ))); let serve_repair_service = ServeRepairService::new( &serve_repair, Some(blockstore.clone()), diff --git a/dos/src/main.rs b/dos/src/main.rs index 6fefd16af3..d77ac1d620 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -49,7 +49,7 @@ use { rpc_client::RpcClient, tpu_connection::TpuConnection, }, - solana_core::serve_repair::RepairProtocol, + solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair}, solana_dos::cli::*, solana_gossip::{ contact_info::ContactInfo, @@ -64,6 +64,7 @@ use { stake, system_instruction::{self, SystemInstruction}, system_program, + timing::timestamp, transaction::Transaction, }, solana_streamer::socket::SocketAddrSpace, @@ -81,13 +82,6 @@ fn compute_rate_per_second(count: usize) -> usize { (count * 1000) / SAMPLE_PERIOD_MS } -fn get_repair_contact(nodes: &[ContactInfo]) -> ContactInfo { - let source = thread_rng().gen_range(0, nodes.len()); - let mut contact = nodes[source].clone(); - contact.id = solana_sdk::pubkey::new_rand(); - contact -} - /// Provide functionality to generate several types of transactions: /// /// 1. Without blockhash @@ -241,11 +235,11 @@ fn get_target( nodes: &[ContactInfo], mode: Mode, entrypoint_addr: SocketAddr, -) -> Option { +) -> Option<(Pubkey, SocketAddr)> { let mut target = None; if nodes.is_empty() { // skip-gossip case - target = Some(entrypoint_addr); + target = Some((solana_sdk::pubkey::new_rand(), entrypoint_addr)); } else { info!("************ NODE ***********"); for node in nodes { @@ -257,13 +251,13 @@ fn get_target( if node.gossip == entrypoint_addr { info!("{}", node.gossip); target = match mode { - Mode::Gossip => Some(node.gossip), - Mode::Tvu => Some(node.tvu), - Mode::TvuForwards => Some(node.tvu_forwards), - Mode::Tpu => Some(node.tpu), - Mode::TpuForwards => Some(node.tpu_forwards), - Mode::Repair => Some(node.repair), - Mode::ServeRepair => Some(node.serve_repair), + Mode::Gossip => Some((node.id, node.gossip)), + Mode::Tvu => Some((node.id, node.tvu)), + Mode::TvuForwards => Some((node.id, node.tvu_forwards)), + Mode::Tpu => Some((node.id, node.tpu)), + Mode::TpuForwards => Some((node.id, node.tpu_forwards)), + Mode::Repair => Some((node.id, node.repair)), + Mode::ServeRepair => Some((node.id, node.serve_repair)), Mode::Rpc => None, }; break; @@ -500,39 +494,47 @@ fn run_dos( } else if params.data_type == DataType::Transaction && params.transaction_params.unique_transactions { - let target = target.expect("should have target"); - info!("Targeting {}", target); + let (_, target_addr) = target.expect("should have target"); + info!("Targeting {}", target_addr); run_dos_transactions( - target, + target_addr, iterations, client, params.transaction_params, params.tpu_use_quic, ); } else { - let target = target.expect("should have target"); - info!("Targeting {}", target); + let (target_id, target_addr) = target.expect("should have target"); + info!("Targeting {}", target_addr); let mut data = match params.data_type { DataType::RepairHighest => { let slot = 100; - let req = - RepairProtocol::WindowIndexWithNonce(get_repair_contact(nodes), slot, 0, 0); - bincode::serialize(&req).unwrap() + let keypair = Keypair::new(); + let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0); + let req = RepairProtocol::WindowIndex { + header, + slot, + shred_index: 0, + }; + ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap() } DataType::RepairShred => { let slot = 100; - let req = RepairProtocol::HighestWindowIndexWithNonce( - get_repair_contact(nodes), + let keypair = Keypair::new(); + let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0); + let req = RepairProtocol::HighestWindowIndex { + header, slot, - 0, - 0, - ); - bincode::serialize(&req).unwrap() + shred_index: 0, + }; + ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap() } DataType::RepairOrphan => { let slot = 100; - let req = RepairProtocol::OrphanWithNonce(get_repair_contact(nodes), slot, 0); - bincode::serialize(&req).unwrap() + let keypair = Keypair::new(); + let header = RepairRequestHeader::new(keypair.pubkey(), target_id, timestamp(), 0); + let req = RepairProtocol::Orphan { header, slot }; + ServeRepair::repair_proto_to_bytes(&req, Some(&keypair)).unwrap() } DataType::Random => { vec![0; params.data_size] @@ -574,7 +576,7 @@ fn run_dos( if params.data_type == DataType::Random { thread_rng().fill(&mut data[..]); } - let res = socket.send_to(&data, target); + let res = socket.send_to(&data, target_addr); if res.is_err() { error_count += 1; } diff --git a/gossip/src/ping_pong.rs b/gossip/src/ping_pong.rs index e115077b99..6c3a219cfd 100644 --- a/gossip/src/ping_pong.rs +++ b/gossip/src/ping_pong.rs @@ -109,6 +109,10 @@ impl Pong { }; Ok(pong) } + + pub fn from(&self) -> &Pubkey { + &self.from + } } impl Sanitize for Pong { diff --git a/ledger/src/shred/stats.rs b/ledger/src/shred/stats.rs index 49d4bea5b3..0c630e007e 100644 --- a/ledger/src/shred/stats.rs +++ b/ledger/src/shred/stats.rs @@ -30,6 +30,8 @@ pub struct ProcessShredsStats { pub struct ShredFetchStats { pub index_overrun: usize, pub shred_count: usize, + pub ping_count: usize, + pub ping_err_verify_count: usize, pub(crate) index_bad_deserialize: usize, pub(crate) index_out_of_bounds: usize, pub(crate) slot_bad_deserialize: usize, @@ -115,6 +117,8 @@ impl ShredFetchStats { name, ("index_overrun", self.index_overrun, i64), ("shred_count", self.shred_count, i64), + ("ping_count", self.ping_count, i64), + ("ping_err_verify_count", self.ping_err_verify_count, i64), ("slot_bad_deserialize", self.slot_bad_deserialize, i64), ("index_bad_deserialize", self.index_bad_deserialize, i64), ("index_out_of_bounds", self.index_out_of_bounds, i64), diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 695eafb2bc..0d85d3e3ab 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -484,6 +484,10 @@ pub mod compact_vote_state_updates { solana_sdk::declare_id!("86HpNqzutEZwLcPxS6EHDcMNYWk6ikhteg9un7Y2PBKE"); } +pub mod sign_repair_requests { + solana_sdk::declare_id!("sigrs6u1EWeHuoKFkY8RR7qcSsPmrAeBBPESyf5pnYe"); +} + pub mod concurrent_replay_of_forks { solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe"); } @@ -607,6 +611,7 @@ lazy_static! { (loosen_cpi_size_restriction::id(), "loosen cpi size restrictions #26641"), (use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"), (compact_vote_state_updates::id(), "Compact vote state updates to lower block size"), + (sign_repair_requests::id(), "sign repair requests #26834"), (concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"), (incremental_snapshot_only_incremental_hash_calculation::id(), "only hash accounts in incremental snapshot during incremental snapshot creation #26799"), /*************** ADD NEW FEATURES HERE ***************/