From 97f2bcff691e5fb4460f1363679cacb3437e39e6 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 19 May 2020 12:38:18 -0700 Subject: [PATCH] master: Add nonce to shreds repairs, add shred data size to header (#10109) * Add nonce to shreds/repairs * Add data shred size to header Co-authored-by: Carl --- Cargo.lock | 1 + core/benches/shredder.rs | 10 +- core/src/broadcast_stage.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 8 +- core/src/lib.rs | 1 + core/src/repair_response.rs | 112 +++++++ core/src/repair_service.rs | 59 ++-- core/src/serve_repair.rs | 275 +++++++++++------- core/src/window_service.rs | 126 +++++--- dos/Cargo.toml | 1 + dos/src/main.rs | 12 +- ledger/src/blockstore.rs | 23 +- ledger/src/shred.rs | 135 +++++++-- ledger/src/sigverify_shreds.rs | 72 +++-- ledger/tests/shred.rs | 17 +- 15 files changed, 598 insertions(+), 256 deletions(-) create mode 100644 core/src/repair_response.rs diff --git a/Cargo.lock b/Cargo.lock index fa9522d98..d196ba8f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4029,6 +4029,7 @@ dependencies = [ "rayon", "solana-clap-utils", "solana-core", + "solana-ledger", "solana-logger", "solana-net-utils", "solana-runtime", diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 7a7b07f9a..4efc6d4d4 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -32,7 +32,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) { let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; // ~1Mb - let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64; + let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); bencher.iter(|| { let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap(); @@ -46,7 +46,11 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) { let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; let txs_per_entry = 128; - let num_entries = max_entries_per_n_shred(&make_test_entry(txs_per_entry), num_shreds as u64); + let num_entries = max_entries_per_n_shred( + &make_test_entry(txs_per_entry), + num_shreds as u64, + Some(shred_size), + ); let entries = make_large_unchained_entries(txs_per_entry, num_entries); // 1Mb bencher.iter(|| { @@ -61,7 +65,7 @@ fn bench_deshredder(bencher: &mut Bencher) { let shred_size = SIZE_OF_DATA_SHRED_PAYLOAD; // ~10Mb let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; - let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64; + let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64; let entries = create_ticks(num_ticks, 0, Hash::default()); let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap(); let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 2768867a9..9e9e6c31b 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -464,7 +464,7 @@ pub mod test { Vec, Vec, ) { - let num_entries = max_ticks_per_n_shreds(num); + let num_entries = max_ticks_per_n_shreds(num, None); let (data_shreds, _) = make_slot_entries(slot, 0, num_entries); let keypair = Arc::new(Keypair::new()); let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index c05fa31e0..3f3a888da 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -428,7 +428,7 @@ mod test { let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(leader_info.info)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut genesis_config = create_genesis_config(10_000).genesis_config; - genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1; + genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1; let bank0 = Arc::new(Bank::new(&genesis_config)); ( blockstore, @@ -537,7 +537,11 @@ mod test { // Interrupting the slot should cause the unfinished_slot and stats to reset let num_shreds = 1; assert!(num_shreds < num_shreds_per_slot); - let ticks1 = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_config.hash()); + let ticks1 = create_ticks( + max_ticks_per_n_shreds(num_shreds, None), + 0, + genesis_config.hash(), + ); let receive_results = ReceiveResults { entries: ticks1.clone(), time_elapsed: Duration::new(2, 0), diff --git a/core/src/lib.rs b/core/src/lib.rs index 07a29e18d..58c691878 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -35,6 +35,7 @@ pub mod poh_recorder; pub mod poh_service; pub mod progress_map; pub mod pubkey_references; +pub mod repair_response; pub mod repair_service; pub mod replay_stage; mod result; diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs new file mode 100644 index 000000000..fef52628c --- /dev/null +++ b/core/src/repair_response.rs @@ -0,0 +1,112 @@ +use solana_ledger::{ + blockstore::Blockstore, + shred::{Nonce, SIZE_OF_NONCE}, +}; +use solana_perf::packet::limited_deserialize; +use solana_sdk::{clock::Slot, packet::Packet}; +use std::{io, net::SocketAddr}; + +pub fn repair_response_packet( + blockstore: &Blockstore, + slot: Slot, + shred_index: u64, + dest: &SocketAddr, + nonce: Nonce, +) -> Option { + let shred = blockstore + .get_data_shred(slot, shred_index) + .expect("Blockstore could not get data shred"); + shred + .map(|shred| repair_response_packet_from_shred(shred, dest, nonce)) + .unwrap_or(None) +} + +pub fn repair_response_packet_from_shred( + shred: Vec, + dest: &SocketAddr, + nonce: Nonce, +) -> Option { + let mut packet = Packet::default(); + packet.meta.size = shred.len() + SIZE_OF_NONCE; + if packet.meta.size > packet.data.len() { + return None; + } + packet.meta.set_addr(dest); + packet.data[..shred.len()].copy_from_slice(&shred); + let mut wr = io::Cursor::new(&mut packet.data[shred.len()..]); + bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce"); + Some(packet) +} + +pub fn nonce(buf: &[u8]) -> Option { + if buf.len() < SIZE_OF_NONCE { + None + } else { + limited_deserialize(&buf[buf.len() - SIZE_OF_NONCE..]).ok() + } +} + +#[cfg(test)] +mod test { + use super::*; + use solana_ledger::{ + shred::{Shred, Shredder}, + sigverify_shreds::verify_shred_cpu, + }; + use solana_sdk::signature::{Keypair, Signer}; + use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + }; + + fn run_test_sigverify_shred_cpu_repair(slot: Slot) { + solana_logger::setup(); + let mut shred = Shred::new_from_data( + slot, + 0xc0de, + 0xdead, + Some(&[1, 2, 3, 4]), + true, + true, + 0, + 0, + 0xc0de, + ); + assert_eq!(shred.slot(), slot); + let keypair = Keypair::new(); + Shredder::sign_shred(&keypair, &mut shred); + trace!("signature {}", shred.common_header.signature); + let nonce = 9; + let mut packet = repair_response_packet_from_shred( + shred.payload, + &SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080), + nonce, + ) + .unwrap(); + packet.meta.repair = true; + + let leader_slots = [(slot, keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(1)); + + let wrong_keypair = Keypair::new(); + let leader_slots = [(slot, wrong_keypair.pubkey().to_bytes())] + .iter() + .cloned() + .collect(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, Some(0)); + + let leader_slots = HashMap::new(); + let rv = verify_shred_cpu(&packet, &leader_slots); + assert_eq!(rv, None); + } + + #[test] + fn test_sigverify_shred_cpu_repair() { + run_test_sigverify_shred_cpu_repair(0xdead_c0de); + } +} diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 52aadd2fa..01ec5cb70 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -5,12 +5,13 @@ use crate::{ cluster_slots::ClusterSlots, consensus::VOTE_THRESHOLD_SIZE, result::Result, - serve_repair::{RepairType, ServeRepair}, + serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, + shred::Nonce, }; use solana_runtime::bank::Bank; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; @@ -104,7 +105,7 @@ impl RepairService { &blockstore, &exit, &repair_socket, - &cluster_info, + cluster_info, repair_info, &cluster_slots, ) @@ -118,19 +119,19 @@ impl RepairService { blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, - cluster_info: &Arc, + cluster_info: Arc, repair_info: RepairInfo, - cluster_slots: &Arc, + cluster_slots: &ClusterSlots, ) { let serve_repair = ServeRepair::new(cluster_info.clone()); let id = cluster_info.id(); - Self::initialize_lowest_slot(id, blockstore, cluster_info); + Self::initialize_lowest_slot(id, blockstore, &cluster_info); let mut repair_stats = RepairStats::default(); let mut last_stats = Instant::now(); let mut duplicate_slot_repair_statuses = HashMap::new(); Self::initialize_epoch_slots( blockstore, - cluster_info, + &cluster_info, &repair_info.completed_slots_receiver, ); loop { @@ -144,7 +145,7 @@ impl RepairService { let lowest_slot = blockstore.lowest_slot(); Self::update_lowest_slot(&id, lowest_slot, &cluster_info); Self::update_completed_slots(&repair_info.completed_slots_receiver, &cluster_info); - cluster_slots.update(new_root, cluster_info, &repair_info.bank_forks); + cluster_slots.update(new_root, &cluster_info, &repair_info.bank_forks); let new_duplicate_slots = Self::find_new_duplicate_slots( &duplicate_slot_repair_statuses, blockstore, @@ -178,27 +179,19 @@ impl RepairService { if let Ok(repairs) = repairs { let mut cache = HashMap::new(); - let reqs: Vec<((SocketAddr, Vec), RepairType)> = repairs - .into_iter() - .filter_map(|repair_request| { - serve_repair - .repair_request( - &cluster_slots, - &repair_request, - &mut cache, - &mut repair_stats, - ) - .map(|result| (result, repair_request)) - .ok() - }) - .collect(); - - for ((to, req), _) in reqs { - repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{} repair req send_to({}) error {:?}", id, to, e); - 0 - }); - } + repairs.into_iter().for_each(|repair_request| { + if let Ok((to, req)) = serve_repair.repair_request( + &cluster_slots, + repair_request, + &mut cache, + &mut repair_stats, + ) { + repair_socket.send_to(&req, to).unwrap_or_else(|e| { + info!("{} repair req send_to({}) error {:?}", id, to, e); + 0 + }); + } + }); } if last_stats.elapsed().as_secs() > 1 { @@ -326,6 +319,7 @@ impl RepairService { &repair_addr, serve_repair, repair_stats, + DEFAULT_NONCE, ) { info!("repair req send_to({}) error {:?}", repair_addr, e); } @@ -346,8 +340,9 @@ impl RepairService { to: &SocketAddr, serve_repair: &ServeRepair, repair_stats: &mut RepairStats, + nonce: Nonce, ) -> Result<()> { - let req = serve_repair.map_repair_request(&repair_type, repair_stats)?; + let req = serve_repair.map_repair_request(&repair_type, repair_stats, nonce)?; repair_socket.send_to(&req, to)?; Ok(()) } @@ -740,7 +735,7 @@ mod test { let blockstore = Blockstore::open(&blockstore_path).unwrap(); let slots: Vec = vec![1, 3, 5, 7, 8]; - let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); for (mut slot_shreds, _) in shreds.into_iter() { @@ -850,7 +845,7 @@ mod test { ); // Insert some shreds to create a SlotMeta, should make repairs - let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot); blockstore .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) @@ -883,7 +878,7 @@ mod test { }; // Insert some shreds to create a SlotMeta, - let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot); blockstore .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index edf0ac5ae..55f8787a0 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -2,16 +2,17 @@ use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, cluster_slots::ClusterSlots, contact_info::ContactInfo, + repair_response, repair_service::RepairStats, result::{Error, Result}, weighted_shuffle::weighted_best, }; use bincode::serialize; -use solana_ledger::blockstore::Blockstore; +use solana_ledger::{blockstore::Blockstore, shred::Nonce}; use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug}; -use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler}; +use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}; use solana_sdk::{ clock::Slot, pubkey::Pubkey, @@ -30,6 +31,7 @@ use std::{ /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; +pub const DEFAULT_NONCE: u32 = 42; #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum RepairType { @@ -65,6 +67,9 @@ pub enum RepairProtocol { WindowIndex(ContactInfo, u64, u64), HighestWindowIndex(ContactInfo, u64, u64), Orphan(ContactInfo, u64), + WindowIndexWithNonce(ContactInfo, u64, u64, Nonce), + HighestWindowIndexWithNonce(ContactInfo, u64, u64, Nonce), + OrphanWithNonce(ContactInfo, u64, Nonce), } #[derive(Clone)] @@ -107,6 +112,9 @@ impl ServeRepair { RepairProtocol::WindowIndex(ref from, _, _) => from, RepairProtocol::HighestWindowIndex(ref from, _, _) => from, RepairProtocol::Orphan(ref from, _) => from, + RepairProtocol::WindowIndexWithNonce(ref from, _, _, _) => from, + RepairProtocol::HighestWindowIndexWithNonce(ref from, _, _, _) => from, + RepairProtocol::OrphanWithNonce(ref from, _, _) => from, } } @@ -130,7 +138,7 @@ impl ServeRepair { let (res, label) = { match &request { - RepairProtocol::WindowIndex(from, slot, shred_index) => { + RepairProtocol::WindowIndexWithNonce(_, slot, shred_index, nonce) => { stats.window_index += 1; ( Self::run_window_request( @@ -141,12 +149,12 @@ impl ServeRepair { &me.read().unwrap().my_info, *slot, *shred_index, + *nonce, ), - "WindowIndex", + "WindowIndexWithNonce", ) } - - RepairProtocol::HighestWindowIndex(_, slot, highest_index) => { + RepairProtocol::HighestWindowIndexWithNonce(_, slot, highest_index, nonce) => { stats.highest_window_index += 1; ( Self::run_highest_window_request( @@ -155,11 +163,12 @@ impl ServeRepair { blockstore, *slot, *highest_index, + *nonce, ), - "HighestWindowIndex", + "HighestWindowIndexWithNonce", ) } - RepairProtocol::Orphan(_, slot) => { + RepairProtocol::OrphanWithNonce(_, slot, nonce) => { stats.orphan += 1; ( Self::run_orphan( @@ -168,10 +177,12 @@ impl ServeRepair { blockstore, *slot, MAX_ORPHAN_REPAIR_RESPONSES, + *nonce, ), - "Orphan", + "OrphanWithNonce", ) } + _ => (None, "Unsupported repair type"), } }; @@ -331,20 +342,36 @@ impl ServeRepair { }); } - fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { - let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index); + fn window_index_request_bytes( + &self, + slot: Slot, + shred_index: u64, + nonce: Nonce, + ) -> Result> { + let req = + RepairProtocol::WindowIndexWithNonce(self.my_info.clone(), slot, shred_index, nonce); let out = serialize(&req)?; Ok(out) } - fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { - let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index); + fn window_highest_index_request_bytes( + &self, + slot: Slot, + shred_index: u64, + nonce: Nonce, + ) -> Result> { + let req = RepairProtocol::HighestWindowIndexWithNonce( + self.my_info.clone(), + slot, + shred_index, + nonce, + ); let out = serialize(&req)?; Ok(out) } - fn orphan_bytes(&self, slot: Slot) -> Result> { - let req = RepairProtocol::Orphan(self.my_info.clone(), slot); + fn orphan_bytes(&self, slot: Slot, nonce: Nonce) -> Result> { + let req = RepairProtocol::OrphanWithNonce(self.my_info.clone(), slot, nonce); let out = serialize(&req)?; Ok(out) } @@ -352,24 +379,25 @@ impl ServeRepair { pub fn repair_request( &self, cluster_slots: &ClusterSlots, - repair_request: &RepairType, + repair_request: RepairType, cache: &mut RepairCache, repair_stats: &mut RepairStats, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location - if cache.get(&repair_request.slot()).is_none() { - let repair_peers: Vec<_> = self.cluster_info.repair_peers(repair_request.slot()); + let slot = repair_request.slot(); + if cache.get(&slot).is_none() { + let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); if repair_peers.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } - let weights = cluster_slots.compute_weights(repair_request.slot(), &repair_peers); - cache.insert(repair_request.slot(), (repair_peers, weights)); + let weights = cluster_slots.compute_weights(slot, &repair_peers); + cache.insert(slot, (repair_peers, weights)); } - let (repair_peers, weights) = cache.get(&repair_request.slot()).unwrap(); + let (repair_peers, weights) = cache.get(&slot).unwrap(); let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port - let out = self.map_repair_request(repair_request, repair_stats)?; + let out = self.map_repair_request(&repair_request, repair_stats, DEFAULT_NONCE)?; Ok((addr, out)) } @@ -391,19 +419,20 @@ impl ServeRepair { &self, repair_request: &RepairType, repair_stats: &mut RepairStats, + nonce: Nonce, ) -> Result> { match repair_request { RepairType::Shred(slot, shred_index) => { repair_stats.shred.update(*slot); - Ok(self.window_index_request_bytes(*slot, *shred_index)?) + Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?) } RepairType::HighestShred(slot, shred_index) => { repair_stats.highest_shred.update(*slot); - Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) + Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?) } RepairType::Orphan(slot) => { repair_stats.orphan.update(*slot); - Ok(self.orphan_bytes(*slot)?) + Ok(self.orphan_bytes(*slot, nonce)?) } } } @@ -416,12 +445,19 @@ impl ServeRepair { me: &ContactInfo, slot: Slot, shred_index: u64, + nonce: Nonce, ) -> Option { if let Some(blockstore) = blockstore { // Try to find the requested index in one of the slots - let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr); + let packet = repair_response::repair_response_packet( + blockstore, + slot, + shred_index, + from_addr, + nonce, + ); - if let Ok(Some(packet)) = packet { + if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); return Some(Packets::new_with_recycler_data( recycler, @@ -449,15 +485,20 @@ impl ServeRepair { blockstore: Option<&Arc>, slot: Slot, highest_index: u64, + nonce: Nonce, ) -> Option { let blockstore = blockstore?; // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; if meta.received > highest_index { // meta.received must be at least 1 by this point - let packet = - Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr) - .ok()??; + let packet = repair_response::repair_response_packet( + blockstore, + slot, + meta.received - 1, + from_addr, + nonce, + )?; return Some(Packets::new_with_recycler_data( recycler, "run_highest_window_request", @@ -473,6 +514,7 @@ impl ServeRepair { blockstore: Option<&Arc>, mut slot: Slot, max_responses: usize, + nonce: Nonce, ) -> Option { let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blockstore) = blockstore { @@ -481,9 +523,14 @@ impl ServeRepair { if meta.received == 0 { break; } - let packet = - Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr); - if let Ok(Some(packet)) = packet { + let packet = repair_response::repair_response_packet( + blockstore, + slot, + meta.received - 1, + from_addr, + nonce, + ); + if let Some(packet) = packet { res.packets.push(packet); } if meta.is_parent_set() && res.packets.len() <= max_responses { @@ -498,28 +545,12 @@ impl ServeRepair { } Some(res) } - - fn get_data_shred_as_packet( - blockstore: &Arc, - slot: Slot, - shred_index: u64, - dest: &SocketAddr, - ) -> Result> { - let data = blockstore.get_data_shred(slot, shred_index)?; - Ok(data.map(|data| { - let mut packet = Packet::default(); - packet.meta.size = data.len(); - packet.meta.set_addr(dest); - packet.data.copy_from_slice(&data); - packet - })) - } } #[cfg(test)] mod tests { use super::*; - use crate::result::Error; + use crate::{repair_response, result::Error}; use solana_ledger::get_tmp_ledger_path; use solana_ledger::{ blockstore::make_many_slot_entries, @@ -530,9 +561,13 @@ mod tests { }; use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp}; - /// test run_window_requestwindow requests respond with the right shred, and do not overrun #[test] - fn run_highest_window_request() { + fn test_run_highest_window_request() { + run_highest_window_request(5, 3, 9); + } + + /// test run_window_request responds with the right shred, and do not overrun + fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) { let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); @@ -544,41 +579,49 @@ mod tests { Some(&blockstore), 0, 0, + nonce, ); assert!(rv.is_none()); let _ = fill_blockstore_slot_with_ticks( &blockstore, - max_ticks_per_n_shreds(1) + 1, - 2, - 1, + max_ticks_per_n_shreds(1, None) + 1, + slot, + slot - num_slots + 1, Hash::default(), ); + let index = 1; let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), Some(&blockstore), - 2, - 1, - ); + slot, + index, + nonce, + ) + .expect("packets"); + let rv: Vec = rv - .expect("packets") .packets .into_iter() - .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) + .filter_map(|b| { + assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); + Shred::new_from_serialized_shred(b.data.to_vec()).ok() + }) .collect(); assert!(!rv.is_empty()); - let index = blockstore.meta(2).unwrap().unwrap().received - 1; + let index = blockstore.meta(slot).unwrap().unwrap().received - 1; assert_eq!(rv[0].index(), index as u32); - assert_eq!(rv[0].slot(), 2); + assert_eq!(rv[0].slot(), slot); let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), Some(&blockstore), - 2, + slot, index + 1, + nonce, ); assert!(rv.is_none()); } @@ -586,9 +629,13 @@ mod tests { Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } - /// test window requests respond with the right shred, and do not overrun #[test] - fn run_window_request() { + fn test_run_window_request() { + run_window_request(2, 9); + } + + /// test window requests respond with the right shred, and do not overrun + fn run_window_request(slot: Slot, nonce: Nonce) { let recycler = PacketsRecycler::default(); solana_logger::setup(); let ledger_path = get_tmp_ledger_path!(); @@ -615,12 +662,13 @@ mod tests { &socketaddr_any!(), Some(&blockstore), &me, + slot, 0, - 0, + nonce, ); assert!(rv.is_none()); let mut common_header = ShredCommonHeader::default(); - common_header.slot = 2; + common_header.slot = slot; common_header.index = 1; let mut data_header = DataShredHeader::default(); data_header.parent_offset = 1; @@ -634,24 +682,28 @@ mod tests { .insert_shreds(vec![shred_info], None, false) .expect("Expect successful ledger write"); + let index = 1; let rv = ServeRepair::run_window_request( &recycler, &me, &socketaddr_any!(), Some(&blockstore), &me, - 2, - 1, - ); - assert!(!rv.is_none()); + slot, + index, + nonce, + ) + .expect("packets"); let rv: Vec = rv - .expect("packets") .packets .into_iter() - .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) + .filter_map(|b| { + assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); + Shred::new_from_serialized_shred(b.data.to_vec()).ok() + }) .collect(); assert_eq!(rv[0].index(), 1); - assert_eq!(rv[0].slot(), 2); + assert_eq!(rv[0].slot(), slot); } Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); @@ -665,7 +717,7 @@ mod tests { let serve_repair = ServeRepair::new(cluster_info.clone()); let rv = serve_repair.repair_request( &cluster_slots, - &RepairType::Shred(0, 0), + RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), ); @@ -691,7 +743,7 @@ mod tests { let rv = serve_repair .repair_request( &cluster_slots, - &RepairType::Shred(0, 0), + RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), ) @@ -723,7 +775,7 @@ mod tests { let rv = serve_repair .repair_request( &cluster_slots, - &RepairType::Shred(0, 0), + RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), ) @@ -739,52 +791,75 @@ mod tests { } #[test] - fn run_orphan() { + fn test_run_orphan() { + run_orphan(2, 3, 9); + } + + fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) { solana_logger::setup(); let recycler = PacketsRecycler::default(); let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rv = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0); + let rv = ServeRepair::run_orphan( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot, + 0, + nonce, + ); assert!(rv.is_none()); - // Create slots 1, 2, 3 with 5 shreds apiece - let (shreds, _) = make_many_slot_entries(1, 3, 5); + // Create slots [slot, slot + num_slots) with 5 shreds apiece + let (shreds, _) = make_many_slot_entries(slot, num_slots, 5); blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); - // We don't have slot 4, so we don't know how to service this requeset - let rv = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5); + // We don't have slot `slot + num_slots`, so we don't know how to service this request + let rv = ServeRepair::run_orphan( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots, + 5, + nonce, + ); assert!(rv.is_none()); - // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively - // for this request - let rv: Vec<_> = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5) - .expect("run_orphan packets") - .packets - .iter() - .cloned() - .collect(); - let expected: Vec<_> = (1..=3) + // For a orphan request for `slot + num_slots - 1`, we should return the highest shreds + // from slots in the range [slot, slot + num_slots - 1] + let rv: Vec<_> = ServeRepair::run_orphan( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + slot + num_slots - 1, + 5, + nonce, + ) + .expect("run_orphan packets") + .packets + .iter() + .cloned() + .collect(); + + // Verify responses + let expected: Vec<_> = (slot..slot + num_slots) .rev() - .map(|slot| { + .filter_map(|slot| { let index = blockstore.meta(slot).unwrap().unwrap().received - 1; - ServeRepair::get_data_shred_as_packet( + repair_response::repair_response_packet( &blockstore, slot, index, &socketaddr_any!(), + nonce, ) - .unwrap() - .unwrap() }) .collect(); - assert_eq!(rv, expected) + assert_eq!(rv, expected); } Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 70a41823f..16628ab7c 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,8 +4,10 @@ use crate::{ cluster_info::ClusterInfo, cluster_slots::ClusterSlots, + repair_response, repair_service::{RepairInfo, RepairService}, result::{Error, Result}, + serve_repair::DEFAULT_NONCE, }; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, @@ -13,24 +15,25 @@ use crossbeam_channel::{ use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; -use solana_ledger::bank_forks::BankForks; -use solana_ledger::blockstore::{ - self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT, +use solana_ledger::{ + bank_forks::BankForks, + blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, + leader_schedule_cache::LeaderScheduleCache, + shred::{Nonce, Shred}, }; -use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::shred::Shred; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_perf::packet::Packets; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::bank::Bank; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::duration_as_ms; +use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; use solana_streamer::streamer::PacketSender; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread::{self, Builder, JoinHandle}; -use std::time::{Duration, Instant}; +use std::{ + net::{SocketAddr, UdpSocket}, + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, +}; fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { @@ -107,8 +110,15 @@ fn run_check_duplicate( Ok(()) } +fn verify_repair(_shred: &Shred, repair_info: &Option) -> bool { + repair_info + .as_ref() + .map(|repair_info| repair_info.nonce == DEFAULT_NONCE) + .unwrap_or(true) +} + fn run_insert( - shred_receiver: &CrossbeamReceiver>, + shred_receiver: &CrossbeamReceiver<(Vec, Vec>)>, blockstore: &Arc, leader_schedule_cache: &Arc, handle_duplicate: F, @@ -118,12 +128,16 @@ where F: Fn(Shred) -> (), { let timer = Duration::from_millis(200); - let mut shreds = shred_receiver.recv_timeout(timer)?; - - while let Ok(mut more_shreds) = shred_receiver.try_recv() { - shreds.append(&mut more_shreds) + let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?; + while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() { + shreds.extend(more_shreds); + repair_infos.extend(more_repair_infos); } + assert_eq!(shreds.len(), repair_infos.len()); + let mut i = 0; + shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0); + blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), @@ -136,7 +150,7 @@ where fn recv_window( blockstore: &Arc, - insert_shred_sender: &CrossbeamSender>, + insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, retransmit: &PacketSender, @@ -160,7 +174,7 @@ where inc_new_counter_debug!("streamer-recv_window-recv", total_packets); let last_root = blockstore.last_root(); - let shreds: Vec<_> = thread_pool.install(|| { + let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .par_iter_mut() .flat_map(|packets| { @@ -169,34 +183,58 @@ where .iter_mut() .filter_map(|packet| { if packet.meta.discard { - inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); + inc_new_counter_debug!( + "streamer-recv_window-invalid_or_unnecessary_packet", + 1 + ); None - } else if let Ok(shred) = - Shred::new_from_serialized_shred(packet.data.to_vec()) - { - if shred_filter(&shred, last_root) { - // Mark slot as dead if the current shred is on the boundary - // of max shreds per slot. However, let the current shred - // get retransmitted. It'll allow peer nodes to see this shred - // and trigger them to mark the slot as dead. - if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 { - let _ = blockstore.set_dead_slot(shred.slot()); + } else { + // shred fetch stage should be sending packets + // with sufficiently large buffers. Needed to ensure + // call to `new_from_serialized_shred` is safe. + assert_eq!(packet.data.len(), PACKET_DATA_SIZE); + let serialized_shred = packet.data.to_vec(); + if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) { + let repair_info = { + if packet.meta.repair { + if let Some(nonce) = repair_response::nonce(&packet.data) { + let repair_info = RepairMeta { + _from_addr: packet.meta.addr(), + nonce, + }; + Some(repair_info) + } else { + // If can't parse the nonce, dump the packet + return None; + } + } else { + None + } + }; + if shred_filter(&shred, last_root) { + // Mark slot as dead if the current shred is on the boundary + // of max shreds per slot. However, let the current shred + // get retransmitted. It'll allow peer nodes to see this shred + // and trigger them to mark the slot as dead. + if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 { + let _ = blockstore.set_dead_slot(shred.slot()); + } + packet.meta.slot = shred.slot(); + packet.meta.seed = shred.seed(); + Some((shred, repair_info)) + } else { + packet.meta.discard = true; + None } - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(); - Some(shred) } else { packet.meta.discard = true; None } - } else { - packet.meta.discard = true; - None } }) .collect::>() }) - .collect() + .unzip() }); trace!("{:?} shreds from packets", shreds.len()); @@ -210,7 +248,7 @@ where } } - insert_shred_sender.send(shreds)?; + insert_shred_sender.send((shreds, repair_infos))?; trace!( "Elapsed processing time in recv_window(): {}", @@ -220,6 +258,11 @@ where Ok(()) } +struct RepairMeta { + _from_addr: SocketAddr, + nonce: Nonce, +} + // Implement a destructor for the window_service thread to signal it exited // even on panics struct Finalizer { @@ -340,7 +383,7 @@ impl WindowService { exit: &Arc, blockstore: &Arc, leader_schedule_cache: &Arc, - insert_receiver: CrossbeamReceiver>, + insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, duplicate_sender: CrossbeamSender, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -390,7 +433,7 @@ impl WindowService { id: Pubkey, exit: &Arc, blockstore: &Arc, - insert_sender: CrossbeamSender>, + insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, bank_forks: Option>>, @@ -488,13 +531,12 @@ impl WindowService { #[cfg(test)] mod test { use super::*; - use solana_ledger::shred::DataShredHeader; use solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, entry::{create_ticks, Entry}, genesis_utils::create_genesis_config_with_leader, get_tmp_ledger_path, - shred::Shredder, + shred::{DataShredHeader, Shredder}, }; use solana_sdk::{ clock::Slot, diff --git a/dos/Cargo.toml b/dos/Cargo.toml index 37b4b6bf7..b46d12d9a 100644 --- a/dos/Cargo.toml +++ b/dos/Cargo.toml @@ -15,6 +15,7 @@ rand = "0.7.0" rayon = "1.3.0" solana-clap-utils = { path = "../clap-utils", version = "1.2.0" } solana-core = { path = "../core", version = "1.2.0" } +solana-ledger = { path = "../ledger", version = "1.2.0" } solana-logger = { path = "../logger", version = "1.2.0" } solana-net-utils = { path = "../net-utils", version = "1.2.0" } solana-runtime = { path = "../runtime", version = "1.2.0" } diff --git a/dos/src/main.rs b/dos/src/main.rs index 084a16f97..bf5aba815 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -1,9 +1,9 @@ use clap::{crate_description, crate_name, value_t, value_t_or_exit, App, Arg}; use log::*; use rand::{thread_rng, Rng}; -use solana_core::contact_info::ContactInfo; -use solana_core::gossip_service::discover; -use solana_core::serve_repair::RepairProtocol; +use solana_core::{ + contact_info::ContactInfo, gossip_service::discover, serve_repair::RepairProtocol, +}; use solana_sdk::pubkey::Pubkey; use std::net::{SocketAddr, UdpSocket}; use std::process::exit; @@ -46,17 +46,17 @@ fn run_dos( match data_type.as_str() { "repair_highest" => { let slot = 100; - let req = RepairProtocol::WindowIndex(contact, slot, 0); + let req = RepairProtocol::WindowIndexWithNonce(contact, slot, 0, 0); data = bincode::serialize(&req).unwrap(); } "repair_shred" => { let slot = 100; - let req = RepairProtocol::HighestWindowIndex(contact, slot, 0); + let req = RepairProtocol::HighestWindowIndexWithNonce(contact, slot, 0, 0); data = bincode::serialize(&req).unwrap(); } "repair_orphan" => { let slot = 100; - let req = RepairProtocol::Orphan(contact, slot); + let req = RepairProtocol::OrphanWithNonce(contact, slot, 0); data = bincode::serialize(&req).unwrap(); } "random" => { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e84227b96..4a1801456 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2126,10 +2126,11 @@ impl Blockstore { let data_shreds = data_shreds?; assert!(data_shreds.last().unwrap().data_complete()); - let deshred_payload = Shredder::deshred(&data_shreds).map_err(|_| { - BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( - "Could not reconstruct data block from constituent shreds".to_string(), - ))) + let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| { + BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!( + "Could not reconstruct data block from constituent shreds, error: {:?}", + e + )))) })?; debug!("{:?} shreds in last FEC set", data_shreds.len(),); @@ -3187,7 +3188,7 @@ pub mod tests { #[test] fn test_insert_get_bytes() { // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_n_shreds(1) + 1; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; assert!(num_entries > 1); let (mut shreds, _) = make_slot_entries(0, 0, num_entries); @@ -3447,7 +3448,7 @@ pub mod tests { #[test] fn test_insert_data_shreds_basic() { // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_n_shreds(1) + 1; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; assert!(num_entries > 1); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); @@ -3494,7 +3495,7 @@ pub mod tests { #[test] fn test_insert_data_shreds_reverse() { let num_shreds = 10; - let num_entries = max_ticks_per_n_shreds(num_shreds); + let num_entries = max_ticks_per_n_shreds(num_shreds, None); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); let num_shreds = shreds.len() as u64; @@ -3671,7 +3672,7 @@ pub mod tests { { let blockstore = Blockstore::open(&blockstore_path).unwrap(); // Create enough entries to ensure there are at least two shreds created - let min_entries = max_ticks_per_n_shreds(1) + 1; + let min_entries = max_ticks_per_n_shreds(1, None) + 1; for i in 0..4 { let slot = i; let parent_slot = if i == 0 { 0 } else { i - 1 }; @@ -4096,7 +4097,7 @@ pub mod tests { let blockstore = Blockstore::open(&blockstore_path).unwrap(); let num_slots = 15; // Create enough entries to ensure there are at least two shreds created - let entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; assert!(entries_per_slot > 1); let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot); @@ -4463,7 +4464,7 @@ pub mod tests { let gap: u64 = 10; assert!(gap > 3); // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_n_shreds(1) + 1; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; let entries = create_ticks(num_entries, 0, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0); let num_shreds = shreds.len(); @@ -4902,7 +4903,7 @@ pub mod tests { // Trying to insert value into slot <= than last root should fail { let mut coding_shred = - Shred::new_empty_from_header(shred, DataShredHeader::default(), coding); + Shred::new_empty_from_header(shred.clone(), DataShredHeader::default(), coding); let index = index_cf.get(coding_shred.slot()).unwrap().unwrap(); coding_shred.set_slot(*last_root.read().unwrap()); assert!(!Blockstore::should_insert_coding_shred( diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 02cac1aed..7bccf4fd4 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -24,26 +24,31 @@ use std::mem::size_of; use std::{sync::Arc, time::Instant}; use thiserror::Error; +pub type Nonce = u32; + /// The following constants are computed by hand, and hardcoded. /// `test_shred_constants` ensures that the values are correct. /// Constants are used over lazy_static for performance reasons. pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; -pub const SIZE_OF_DATA_SHRED_HEADER: usize = 3; +pub const SIZE_OF_DATA_SHRED_HEADER: usize = 5; pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6; pub const SIZE_OF_SIGNATURE: usize = 64; pub const SIZE_OF_SHRED_TYPE: usize = 1; pub const SIZE_OF_SHRED_SLOT: usize = 8; pub const SIZE_OF_SHRED_INDEX: usize = 4; +pub const SIZE_OF_NONCE: usize = 4; pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_COMMON_SHRED_HEADER - SIZE_OF_DATA_SHRED_HEADER - - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + - SIZE_OF_DATA_SHRED_IGNORED_TAIL + - SIZE_OF_NONCE; pub const OFFSET_OF_SHRED_TYPE: usize = SIZE_OF_SIGNATURE; pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; +pub const SHRED_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -108,6 +113,7 @@ pub struct ShredCommonHeader { pub struct DataShredHeader { pub parent_offset: u16, pub flags: u8, + pub size: u16, } /// The coding shred header has FEC information @@ -169,7 +175,8 @@ impl Shred { version: u16, fec_set_index: u32, ) -> Self { - let mut payload = vec![0; PACKET_DATA_SIZE]; + let payload_size = SHRED_PAYLOAD_SIZE; + let mut payload = vec![0; payload_size]; let common_header = ShredCommonHeader { slot, index, @@ -178,9 +185,13 @@ impl Shred { ..ShredCommonHeader::default() }; + let size = (data.map(|d| d.len()).unwrap_or(0) + + SIZE_OF_DATA_SHRED_HEADER + + SIZE_OF_COMMON_SHRED_HEADER) as u16; let mut data_header = DataShredHeader { parent_offset, flags: reference_tick.min(SHRED_TICK_REFERENCE_MASK), + size, }; if is_last_data { @@ -199,9 +210,10 @@ impl Shred { &common_header, ) .expect("Failed to write header into shred buffer"); + let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER; Self::serialize_obj_into( &mut start, - SIZE_OF_DATA_SHRED_HEADER, + size_of_data_shred_header, &mut payload, &data_header, ) @@ -219,11 +231,21 @@ impl Shred { } } - pub fn new_from_serialized_shred(payload: Vec) -> Result { + pub fn new_from_serialized_shred(mut payload: Vec) -> Result { let mut start = 0; let common_header: ShredCommonHeader = Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?; + let slot = common_header.slot; + let expected_data_size = SHRED_PAYLOAD_SIZE; + // Safe because any payload from the network must have passed through + // window service, which implies payload wll be of size + // PACKET_DATA_SIZE, and `expected_data_size` <= PACKET_DATA_SIZE. + // + // On the other hand, if this function is called locally, the payload size should match + // the `expected_data_size`. + assert!(payload.len() >= expected_data_size); + payload.truncate(expected_data_size); let shred = if common_header.shred_type == ShredType(CODING_SHRED) { let coding_header: CodingShredHeader = Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?; @@ -234,11 +256,12 @@ impl Shred { payload, } } else if common_header.shred_type == ShredType(DATA_SHRED) { + let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER; let data_header: DataShredHeader = - Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?; + Self::deserialize_obj(&mut start, size_of_data_shred_header, &payload)?; if u64::from(data_header.parent_offset) > common_header.slot { return Err(ShredError::InvalidParentOffset { - slot: common_header.slot, + slot, parent_offset: data_header.parent_offset, }); } @@ -260,7 +283,7 @@ impl Shred { data_header: DataShredHeader, coding_header: CodingShredHeader, ) -> Self { - let mut payload = vec![0; PACKET_DATA_SIZE]; + let mut payload = vec![0; SHRED_PAYLOAD_SIZE]; let mut start = 0; Self::serialize_obj_into( &mut start, @@ -396,7 +419,9 @@ impl Shred { } pub fn reference_tick_from_data(data: &[u8]) -> u8 { - let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER - size_of::()]; + let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER + - size_of::() + - size_of::()]; flags & SHRED_TICK_REFERENCE_MASK } @@ -629,7 +654,7 @@ impl Shredder { let start_index = data_shred_batch[0].common_header.index; // All information after coding shred field in a data shred is encoded - let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; let data_ptrs: Vec<_> = data_shred_batch .iter() .map(|data| &data.payload[..valid_data_len]) @@ -718,7 +743,7 @@ impl Shredder { if missing < first_index_in_fec_set + num_data { Shred::new_empty_data_shred().payload } else { - vec![0; PACKET_DATA_SIZE] + vec![0; SHRED_PAYLOAD_SIZE] } }) .collect(); @@ -733,6 +758,7 @@ impl Shredder { first_code_index: usize, slot: Slot, ) -> std::result::Result, reed_solomon_erasure::Error> { + Self::verify_consistent_shred_payload_sizes(&"try_recovery()", &shreds)?; let mut recovered_data = vec![]; let fec_set_size = num_data + num_coding; @@ -778,7 +804,7 @@ impl Shredder { let session = Session::new(num_data, num_coding)?; - let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; let coding_block_offset = SIZE_OF_CODING_SHRED_HEADER + SIZE_OF_COMMON_SHRED_HEADER; let mut blocks: Vec<(&mut [u8], bool)> = shred_bufs .iter_mut() @@ -823,6 +849,7 @@ impl Shredder { /// Combines all shreds to recreate the original buffer pub fn deshred(shreds: &[Shred]) -> std::result::Result, reed_solomon_erasure::Error> { let num_data = shreds.len(); + Self::verify_consistent_shred_payload_sizes(&"deshred()", shreds)?; let data_shred_bufs = { let first_index = shreds.first().unwrap().index() as usize; let last_shred = shreds.last().unwrap(); @@ -856,7 +883,7 @@ impl Shredder { } fn reassemble_payload(num_data: usize, data_shred_bufs: Vec<&Vec>) -> Vec { - let valid_data_len = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; + let valid_data_len = SHRED_PAYLOAD_SIZE - SIZE_OF_DATA_SHRED_IGNORED_TAIL; data_shred_bufs[..num_data] .iter() .flat_map(|data| { @@ -866,15 +893,43 @@ impl Shredder { .cloned() .collect() } + + fn verify_consistent_shred_payload_sizes( + caller: &str, + shreds: &[Shred], + ) -> std::result::Result<(), reed_solomon_erasure::Error> { + if shreds.is_empty() { + return Err(reed_solomon_erasure::Error::TooFewShardsPresent); + } + let slot = shreds[0].slot(); + for shred in shreds { + if shred.payload.len() != SHRED_PAYLOAD_SIZE { + error!( + "{} Shreds for slot: {} are inconsistent sizes. Expected: {} actual: {}", + caller, + slot, + SHRED_PAYLOAD_SIZE, + shred.payload.len() + ); + return Err(reed_solomon_erasure::Error::IncorrectShardSize); + } + } + + Ok(()) + } } -pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 { +pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); - max_entries_per_n_shred(&ticks[0], num_shreds) + max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) } -pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 { - let shred_data_size = SIZE_OF_DATA_SHRED_PAYLOAD as u64; +pub fn max_entries_per_n_shred( + entry: &Entry, + num_shreds: u64, + shred_data_size: Option, +) -> u64 { + let shred_data_size = shred_data_size.unwrap_or(SIZE_OF_DATA_SHRED_PAYLOAD) as u64; let vec_size = bincode::serialized_size(&vec![entry]).unwrap(); let entry_size = bincode::serialized_size(entry).unwrap(); let count_size = vec_size - entry_size; @@ -892,7 +947,7 @@ pub fn verify_test_data_shred( is_last_in_slot: bool, is_last_in_fec_set: bool, ) { - assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); + assert_eq!(shred.payload.len(), SHRED_PAYLOAD_SIZE); assert!(shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); @@ -933,6 +988,14 @@ pub mod tests { SIZE_OF_DATA_SHRED_HEADER, serialized_size(&DataShredHeader::default()).unwrap() as usize ); + let data_shred_header_with_size = DataShredHeader { + size: 1000, + ..DataShredHeader::default() + }; + assert_eq!( + SIZE_OF_DATA_SHRED_HEADER, + serialized_size(&data_shred_header_with_size).unwrap() as usize + ); assert_eq!( SIZE_OF_SIGNATURE, bincode::serialized_size(&Signature::default()).unwrap() as usize @@ -952,17 +1015,15 @@ pub mod tests { } fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) { - assert_eq!(shred.payload.len(), PACKET_DATA_SIZE); + assert_eq!(shred.payload.len(), SHRED_PAYLOAD_SIZE); assert!(!shred.is_data()); assert_eq!(shred.index(), index); assert_eq!(shred.slot(), slot); assert_eq!(verify, shred.verify(pk)); } - #[test] - fn test_data_shredder() { + fn run_test_data_shredder(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x1234_5678_9abc_def0; // Test that parent cannot be > current slot assert_matches!( @@ -1052,6 +1113,11 @@ pub mod tests { assert_eq!(entries, deshred_entries); } + #[test] + fn test_data_shredder() { + run_test_data_shredder(0x1234_5678_9abc_def0); + } + #[test] fn test_deserialize_shred_payload() { let keypair = Arc::new(Keypair::new()); @@ -1144,22 +1210,21 @@ pub mod tests { ); } - #[test] - fn test_data_and_code_shredder() { + fn run_test_data_and_code_shredder(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x1234_5678_9abc_def0; // Test that FEC rate cannot be > 1.0 assert_matches!( Shredder::new(slot, slot - 5, 1.001, keypair.clone(), 0, 0), Err(ShredError::InvalidFecRate(_)) ); - let shredder = Shredder::new(0x1234_5678_9abc_def0, slot - 5, 1.0, keypair.clone(), 0, 0) + let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); // Create enough entries to make > 1 shred - let num_entries = max_ticks_per_n_shreds(1) + 1; + let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let num_entries = max_ticks_per_n_shreds(1, Some(no_header_size)) + 1; let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -1191,9 +1256,12 @@ pub mod tests { } #[test] - fn test_recovery_and_reassembly() { + fn test_data_and_code_shredder() { + run_test_data_and_code_shredder(0x1234_5678_9abc_def0); + } + + fn run_test_recovery_and_reassembly(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x1234_5678_9abc_def0; let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); @@ -1203,7 +1271,9 @@ pub mod tests { let entry = Entry::new(&Hash::default(), 1, vec![tx0]); let num_data_shreds: usize = 5; - let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64); + let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let num_entries = + max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new(); @@ -1442,6 +1512,11 @@ pub mod tests { ); } + #[test] + fn test_recovery_and_reassembly() { + run_test_recovery_and_reassembly(0x1234_5678_9abc_def0); + } + #[test] fn test_shred_version() { let keypair = Arc::new(Keypair::new()); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 1cfa08f21..c2d0f8258 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -1,5 +1,5 @@ #![allow(clippy::implicit_hasher)] -use crate::shred::ShredType; +use crate::shred::{ShredType, SIZE_OF_NONCE}; use rayon::{ iter::{ IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator, @@ -16,9 +16,12 @@ use solana_perf::{ sigverify::{self, batch_size, TxOffset}, }; use solana_rayon_threadlimit::get_thread_count; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Signature; -use solana_sdk::signature::{Keypair, Signer}; +use solana_sdk::{ + clock::Slot, + pubkey::Pubkey, + signature::Signature, + signature::{Keypair, Signer}, +}; use std::sync::Arc; use std::{collections::HashMap, mem::size_of}; @@ -40,13 +43,12 @@ lazy_static! { /// ... /// } /// Signature is the first thing in the packet, and slot is the first thing in the signed message. -fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { +pub fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> Option { let sig_start = 0; let sig_end = size_of::(); let slot_start = sig_end + size_of::(); let slot_end = slot_start + size_of::(); let msg_start = sig_end; - let msg_end = packet.meta.size; if packet.meta.discard { return Some(0); } @@ -55,6 +57,11 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> O return Some(0); } let slot: u64 = limited_deserialize(&packet.data[slot_start..slot_end]).ok()?; + let msg_end = if packet.meta.repair { + packet.meta.size.saturating_sub(SIZE_OF_NONCE) + } else { + packet.meta.size + }; trace!("slot {}", slot); let pubkey = slot_leaders.get(&slot)?; if packet.meta.size < sig_end { @@ -97,7 +104,7 @@ fn slot_key_data_for_gpu< ) -> (PinnedVec, TxOffset, usize) { //TODO: mark Pubkey::default shreds as failed after the GPU returns assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default())); - let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { + let slots: Vec> = SIGVERIFY_THREAD_POOL.install(|| { batches .into_par_iter() .map(|p| { @@ -185,13 +192,17 @@ fn shred_gpu_offsets( let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes"); msg_sizes.set_pinnable(); let mut v_sig_lens = vec![]; - for batch in batches { + for batch in batches.iter() { let mut sig_lens = Vec::new(); - for packet in &batch.packets { + for packet in batch.packets.iter() { let sig_start = pubkeys_end; let sig_end = sig_start + size_of::(); let msg_start = sig_end; - let msg_end = sig_start + packet.meta.size; + let msg_end = if packet.meta.repair { + sig_start + packet.meta.size.saturating_sub(SIZE_OF_NONCE) + } else { + sig_start + packet.meta.size + }; signature_offsets.push(sig_start as u32); msg_start_offsets.push(msg_start as u32); let msg_size = if msg_end < msg_start { @@ -445,14 +456,12 @@ pub fn sign_shreds_gpu( #[cfg(test)] pub mod tests { use super::*; - use crate::shred::SIZE_OF_DATA_SHRED_PAYLOAD; - use crate::shred::{Shred, Shredder}; + use crate::shred::{Shred, Shredder, SIZE_OF_DATA_SHRED_PAYLOAD}; use solana_sdk::signature::{Keypair, Signer}; - #[test] - fn test_sigverify_shred_cpu() { + + fn run_test_sigverify_shred_cpu(slot: Slot) { solana_logger::setup(); let mut packet = Packet::default(); - let slot = 0xdead_c0de; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -492,10 +501,13 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_cpu() { + fn test_sigverify_shred_cpu() { + run_test_sigverify_shred_cpu(0xdead_c0de); + } + + fn run_test_sigverify_shreds_cpu(slot: Slot) { solana_logger::setup(); let mut batch = [Packets::default()]; - let slot = 0xdead_c0de; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -542,12 +554,15 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_gpu() { + fn test_sigverify_shreds_cpu() { + run_test_sigverify_shreds_cpu(0xdead_c0de); + } + + fn run_test_sigverify_shreds_gpu(slot: Slot) { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); let mut batch = [Packets::default()]; - let slot = 0xdead_c0de; let mut shred = Shred::new_from_data( slot, 0xc0de, @@ -603,14 +618,17 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_sign_gpu() { + fn test_sigverify_shreds_gpu() { + run_test_sigverify_shreds_gpu(0xdead_c0de); + } + + fn run_test_sigverify_shreds_sign_gpu(slot: Slot) { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); let mut packets = Packets::default(); let num_packets = 32; let num_batches = 100; - let slot = 0xdead_c0de; packets.packets.resize(num_packets, Packet::default()); for (i, p) in packets.packets.iter_mut().enumerate() { let shred = Shred::new_from_data( @@ -650,11 +668,14 @@ pub mod tests { } #[test] - fn test_sigverify_shreds_sign_cpu() { + fn test_sigverify_shreds_sign_gpu() { + run_test_sigverify_shreds_sign_gpu(0xdead_c0de); + } + + fn run_test_sigverify_shreds_sign_cpu(slot: Slot) { solana_logger::setup(); let mut batch = [Packets::default()]; - let slot = 0xdead_c0de; let keypair = Keypair::new(); let shred = Shred::new_from_data( slot, @@ -685,4 +706,9 @@ pub mod tests { let rv = verify_shreds_cpu(&batch, &pubkeys); assert_eq!(rv, vec![vec![1]]); } + + #[test] + fn test_sigverify_shreds_sign_cpu() { + run_test_sigverify_shreds_sign_cpu(0xdead_c0de); + } } diff --git a/ledger/tests/shred.rs b/ledger/tests/shred.rs index 61e848f98..505f745f6 100644 --- a/ledger/tests/shred.rs +++ b/ledger/tests/shred.rs @@ -1,16 +1,15 @@ use solana_ledger::entry::Entry; use solana_ledger::shred::{ - max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, + max_entries_per_n_shred, verify_test_data_shred, Shred, Shredder, + MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_DATA_SHRED_PAYLOAD, }; use solana_sdk::signature::{Keypair, Signer}; -use solana_sdk::{hash::Hash, system_transaction}; +use solana_sdk::{clock::Slot, hash::Hash, system_transaction}; use std::convert::TryInto; use std::sync::Arc; -#[test] -fn test_multi_fec_block_coding() { +fn run_test_multi_fec_block_coding(slot: Slot) { let keypair = Arc::new(Keypair::new()); - let slot = 0x1234_5678_9abc_def0; let shredder = Shredder::new(slot, slot - 5, 1.0, keypair.clone(), 0, 0) .expect("Failed in creating shredder"); @@ -20,7 +19,8 @@ fn test_multi_fec_block_coding() { let keypair1 = Keypair::new(); let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); let entry = Entry::new(&Hash::default(), 1, vec![tx0]); - let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64); + let no_header_size = SIZE_OF_DATA_SHRED_PAYLOAD; + let num_entries = max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(no_header_size)); let entries: Vec<_> = (0..num_entries) .map(|_| { @@ -94,3 +94,8 @@ fn test_multi_fec_block_coding() { let result = Shredder::deshred(&all_shreds[..]).unwrap(); assert_eq!(serialized_entries[..], result[..serialized_entries.len()]); } + +#[test] +fn test_multi_fec_block_coding() { + run_test_multi_fec_block_coding(0x1234_5678_9abc_def0); +}