From 6d2dae6ab0ad95ce26300c26952bc92e3a6bbeee Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 31 Aug 2023 12:08:10 -0700 Subject: [PATCH] send duplicate shred proofs for conflicting shred scenarios (#32965) * send duplicate shred proofs for conflicting shred scenarios The scenarios are multiple last_shred_in_slot shreds and coding shreds with conflicting erasure metas. * Pr feedback: deprecate shred_index for gossip duplicate shred proofs * Pr feedback * Remove extraneous dead code --- Cargo.lock | 1 + core/src/window_service.rs | 50 +- gossip/Cargo.toml | 1 + gossip/src/cluster_info.rs | 2 +- gossip/src/crds_gossip.rs | 2 +- gossip/src/duplicate_shred.rs | 695 ++++++++++++++++++++++++-- gossip/src/duplicate_shred_handler.rs | 4 - ledger/src/blockstore.rs | 163 +++--- ledger/src/blockstore_meta.rs | 9 + 9 files changed, 793 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 142cfbd14..a529d4239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6066,6 +6066,7 @@ dependencies = [ "solana-version", "solana-vote-program", "static_assertions", + "test-case", "thiserror", ] diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 88dc0034d..7efe98127 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -19,7 +19,7 @@ use { rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ - blockstore::{Blockstore, BlockstoreInsertionMetrics}, + blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred}, leader_schedule_cache::LeaderScheduleCache, shred::{self, Nonce, ReedSolomonCache, Shred}, }, @@ -138,23 +138,37 @@ impl WindowServiceMetrics { fn run_check_duplicate( cluster_info: &ClusterInfo, blockstore: &Blockstore, - shred_receiver: &Receiver, + shred_receiver: &Receiver, duplicate_slots_sender: &DuplicateSlotSender, ) -> Result<()> { - let check_duplicate = |shred: Shred| -> Result<()> { + let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> { let shred_slot = shred.slot(); - if !blockstore.has_duplicate_shreds_in_slot(shred_slot) { - if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) { - cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?; + let (shred1, shred2) = match shred { + PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict), + PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict), + PossibleDuplicateShred::Exists(shred) => { + // Unlike the other cases we have to wait until here to decide to handle the duplicate and store + // in blockstore. This is because the duplicate could have been part of the same insert batch, + // so we wait until the batch has been written. + if blockstore.has_duplicate_shreds_in_slot(shred_slot) { + return Ok(()); // A duplicate is already recorded + } + let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else { + return Ok(()); // Not a duplicate + }; blockstore.store_duplicate_slot( shred_slot, - existing_shred_payload, - shred.into_payload(), + existing_shred_payload.clone(), + shred.clone().into_payload(), )?; - - duplicate_slots_sender.send(shred_slot)?; + (shred, existing_shred_payload) } - } + }; + + // Propagate duplicate proof through gossip + cluster_info.push_duplicate_shred(&shred1, &shred2)?; + // Notify duplicate consensus state machine + duplicate_slots_sender.send(shred_slot)?; Ok(()) }; @@ -226,7 +240,7 @@ fn run_insert( reed_solomon_cache: &ReedSolomonCache, ) -> Result<()> where - F: Fn(Shred), + F: Fn(PossibleDuplicateShred), { const RECV_TIMEOUT: Duration = Duration::from_millis(200); let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed"); @@ -370,7 +384,7 @@ impl WindowService { cluster_info: Arc, exit: Arc, blockstore: Arc, - duplicate_receiver: Receiver, + duplicate_receiver: Receiver, duplicate_slots_sender: DuplicateSlotSender, ) -> JoinHandle<()> { let handle_error = || { @@ -400,7 +414,7 @@ impl WindowService { blockstore: Arc, leader_schedule_cache: Arc, verified_receiver: Receiver>, - check_duplicate_sender: Sender, + check_duplicate_sender: Sender, completed_data_sets_sender: CompletedDataSetsSender, retransmit_sender: Sender>, outstanding_requests: Arc>, @@ -417,8 +431,8 @@ impl WindowService { Builder::new() .name("solWinInsert".to_string()) .spawn(move || { - let handle_duplicate = |shred| { - let _ = check_duplicate_sender.send(shred); + let handle_duplicate = |possible_duplicate_shred| { + let _ = check_duplicate_sender.send(possible_duplicate_shred); }; let mut metrics = BlockstoreInsertionMetrics::default(); let mut ws_metrics = WindowServiceMetrics::default(); @@ -551,7 +565,9 @@ mod test { }; assert_eq!(duplicate_shred.slot(), shreds[0].slot()); let duplicate_shred_slot = duplicate_shred.slot(); - sender.send(duplicate_shred.clone()).unwrap(); + sender + .send(PossibleDuplicateShred::Exists(duplicate_shred.clone())) + .unwrap(); assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot)); let keypair = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp()); diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 0b88a178e..c3736c2b8 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -54,6 +54,7 @@ thiserror = { workspace = true } [dev-dependencies] num_cpus = { workspace = true } serial_test = { workspace = true } +test-case = { workspace = true } [build-dependencies] rustc_version = { workspace = true } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 3b3acad3c..4ebfb4bfc 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")] +#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 6ab52edd2..41a0e4c9a 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -149,7 +149,7 @@ impl CrdsGossip { let now = timestamp(); for entry in entries { if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) { - error!("push_duplicate_shred faild: {:?}", err); + error!("push_duplicate_shred failed: {:?}", err); } } Ok(()) diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 0d7d35d26..b1ceab79b 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -3,7 +3,7 @@ use { itertools::Itertools, solana_ledger::{ blockstore::BlockstoreError, - blockstore_meta::DuplicateSlotProof, + blockstore_meta::{DuplicateSlotProof, ErasureMeta}, shred::{self, Shred, ShredType}, }, solana_sdk::{ @@ -29,7 +29,7 @@ pub struct DuplicateShred { pub(crate) from: Pubkey, pub(crate) wallclock: u64, pub(crate) slot: Slot, - shred_index: u32, + _unused: u32, shred_type: ShredType, // Serialized DuplicateSlotProof split into chunks. num_chunks: u8, @@ -62,6 +62,10 @@ pub enum Error { InvalidDuplicateShreds, #[error("invalid duplicate slot proof")] InvalidDuplicateSlotProof, + #[error("invalid erasure meta conflict")] + InvalidErasureMetaConflict, + #[error("invalid last index conflict")] + InvalidLastIndexConflict, #[error("invalid signature")] InvalidSignature, #[error("invalid size limit")] @@ -74,8 +78,6 @@ pub enum Error { MissingDataChunk, #[error("(de)serialization error")] SerializationError(#[from] bincode::Error), - #[error("shred index mismatch")] - ShredIndexMismatch, #[error("shred type mismatch")] ShredTypeMismatch, #[error("slot mismatch")] @@ -86,33 +88,64 @@ pub enum Error { UnknownSlotLeader(Slot), } -// Asserts that the two shreds can indicate duplicate proof for -// the same triplet of (slot, shred-index, and shred-type_), and -// that they have valid signatures from the slot leader. +/// Check that `shred1` and `shred2` indicate a valid duplicate proof +/// - Must be for the same slot +/// - Must have the same `shred_type` +/// - Must both sigverify for the correct leader +/// - If `shred1` and `shred2` share the same index they must be not equal +/// - If `shred1` and `shred2` do not share the same index and are data shreds +/// verify that they indicate an index conflict. One of them must be the +/// LAST_SHRED_IN_SLOT, however the other shred must have a higher index. +/// - If `shred1` and `shred2` do not share the same index and are coding shreds +/// verify that they have conflicting erasure metas fn check_shreds(leader_schedule: Option, shred1: &Shred, shred2: &Shred) -> Result<(), Error> where F: FnOnce(Slot) -> Option, { if shred1.slot() != shred2.slot() { - Err(Error::SlotMismatch) - } else if shred1.index() != shred2.index() { - // TODO: Should also allow two coding shreds with different indices but - // same fec-set-index and mismatching erasure-config. - Err(Error::ShredIndexMismatch) - } else if shred1.shred_type() != shred2.shred_type() { - Err(Error::ShredTypeMismatch) - } else if shred1.payload() == shred2.payload() { - Err(Error::InvalidDuplicateShreds) - } else { - if let Some(leader_schedule) = leader_schedule { - let slot_leader = - leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?; - if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { - return Err(Error::InvalidSignature); - } - } - Ok(()) + return Err(Error::SlotMismatch); } + + if shred1.shred_type() != shred2.shred_type() { + return Err(Error::ShredTypeMismatch); + } + + if let Some(leader_schedule) = leader_schedule { + let slot_leader = + leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?; + if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) { + return Err(Error::InvalidSignature); + } + } + + if shred1.index() == shred2.index() { + if shred1.payload() != shred2.payload() { + return Ok(()); + } + return Err(Error::InvalidDuplicateShreds); + } + + if shred1.shred_type() == ShredType::Data { + if shred1.last_in_slot() && shred2.index() > shred1.index() { + return Ok(()); + } + if shred2.last_in_slot() && shred1.index() > shred2.index() { + return Ok(()); + } + return Err(Error::InvalidLastIndexConflict); + } + + // This mirrors the current logic in blockstore to detect coding shreds with conflicting + // erasure sets. However this is not technically exhaustive, as any 2 shreds with + // different but overlapping erasure sets can be considered duplicate and need not be + // a part of the same fec set. Further work to enhance detection is planned in + // https://github.com/solana-labs/solana/issues/33037 + if shred1.fec_set_index() == shred2.fec_set_index() + && !ErasureMeta::check_erasure_consistency(shred1, shred2) + { + return Ok(()); + } + Err(Error::InvalidErasureMetaConflict) } pub(crate) fn from_shred( @@ -131,7 +164,7 @@ where } let other_shred = Shred::new_from_serialized_shred(other_payload)?; check_shreds(leader_schedule, &shred, &other_shred)?; - let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type()); + let (slot, shred_type) = (shred.slot(), shred.shred_type()); let proof = DuplicateSlotProof { shred1: shred.into_payload(), shred2: other_shred.into_payload(), @@ -151,28 +184,25 @@ where from: self_pubkey, wallclock, slot, - shred_index, shred_type, num_chunks, chunk_index: i as u8, chunk, + _unused: 0, }); Ok(chunks) } // Returns a predicate checking if a duplicate-shred chunk matches -// (slot, shred_index, shred_type) and has valid chunk_index. +// (slot, shred_type) and has valid chunk_index. fn check_chunk( slot: Slot, - shred_index: u32, shred_type: ShredType, num_chunks: u8, ) -> impl Fn(&DuplicateShred) -> Result<(), Error> { move |dup| { if dup.slot != slot { Err(Error::SlotMismatch) - } else if dup.shred_index != shred_index { - Err(Error::ShredIndexMismatch) } else if dup.shred_type != shred_type { Err(Error::ShredTypeMismatch) } else if dup.num_chunks != num_chunks { @@ -196,14 +226,13 @@ pub(crate) fn into_shreds( let mut chunks = chunks.into_iter(); let DuplicateShred { slot, - shred_index, shred_type, num_chunks, chunk_index, chunk, .. } = chunks.next().ok_or(Error::InvalidDuplicateShreds)?; - let check_chunk = check_chunk(slot, shred_index, shred_type, num_chunks); + let check_chunk = check_chunk(slot, shred_type, num_chunks); let mut data = HashMap::new(); data.insert(chunk_index, chunk); for chunk in chunks { @@ -231,15 +260,10 @@ pub(crate) fn into_shreds( let shred2 = Shred::new_from_serialized_shred(proof.shred2)?; if shred1.slot() != slot || shred2.slot() != slot { Err(Error::SlotMismatch) - } else if shred1.index() != shred_index || shred2.index() != shred_index { - Err(Error::ShredIndexMismatch) } else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type { Err(Error::ShredTypeMismatch) - } else if shred1.payload() == shred2.payload() { - Err(Error::InvalidDuplicateShreds) - } else if !shred1.verify(slot_leader) || !shred2.verify(slot_leader) { - Err(Error::InvalidSignature) } else { + check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?; Ok((shred1, shred2)) } } @@ -267,6 +291,7 @@ pub(crate) mod tests { system_transaction, }, std::sync::Arc, + test_case::test_case, }; #[test] @@ -275,11 +300,11 @@ pub(crate) mod tests { from: Pubkey::new_unique(), wallclock: u64::MAX, slot: Slot::MAX, - shred_index: u32::MAX, shred_type: ShredType::Data, num_chunks: u8::MAX, chunk_index: u8::MAX, chunk: Vec::default(), + _unused: 0, }; assert_eq!( bincode::serialize(&dup).unwrap().len(), @@ -297,6 +322,71 @@ pub(crate) mod tests { shredder: &Shredder, keypair: &Keypair, ) -> Shred { + let (mut data_shreds, _) = new_rand_shreds( + rng, + next_shred_index, + next_shred_index, + 5, + true, + shredder, + keypair, + true, + ); + data_shreds.pop().unwrap() + } + + fn new_rand_data_shred( + rng: &mut R, + next_shred_index: u32, + shredder: &Shredder, + keypair: &Keypair, + merkle_variant: bool, + is_last_in_slot: bool, + ) -> Shred { + let (mut data_shreds, _) = new_rand_shreds( + rng, + next_shred_index, + next_shred_index, + 5, + merkle_variant, + shredder, + keypair, + is_last_in_slot, + ); + data_shreds.pop().unwrap() + } + + fn new_rand_coding_shreds( + rng: &mut R, + next_shred_index: u32, + num_entries: usize, + shredder: &Shredder, + keypair: &Keypair, + merkle_variant: bool, + ) -> Vec { + let (_, coding_shreds) = new_rand_shreds( + rng, + next_shred_index, + next_shred_index, + num_entries, + merkle_variant, + shredder, + keypair, + true, + ); + coding_shreds + } + + fn new_rand_shreds( + rng: &mut R, + next_shred_index: u32, + next_code_index: u32, + num_entries: usize, + merkle_variant: bool, + shredder: &Shredder, + keypair: &Keypair, + is_last_in_slot: bool, + ) -> (Vec, Vec) { let entries: Vec<_> = std::iter::repeat_with(|| { let tx = system_transaction::transfer( &Keypair::new(), // from @@ -310,30 +400,76 @@ pub(crate) mod tests { vec![tx], // transactions ) }) - .take(5) + .take(num_entries) .collect(); - let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds( + shredder.entries_to_shreds( keypair, &entries, - true, // is_last_in_slot + is_last_in_slot, next_shred_index, - next_shred_index, // next_code_index - true, // merkle_variant + next_code_index, // next_code_index + merkle_variant, &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), - ); - data_shreds.swap_remove(0) + ) } - #[test] - fn test_duplicate_shred_round_trip() { + fn from_shred_bypass_checks( + shred: Shred, + self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value. + other_shred: Shred, + wallclock: u64, + max_size: usize, // Maximum serialized size of each DuplicateShred. + ) -> Result, Error> { + let (slot, shred_type) = (shred.slot(), shred.shred_type()); + let proof = DuplicateSlotProof { + shred1: shred.into_payload(), + shred2: other_shred.into_payload(), + }; + let data = bincode::serialize(&proof)?; + let chunk_size = max_size - DUPLICATE_SHRED_HEADER_SIZE; + let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect(); + let num_chunks = u8::try_from(chunks.len())?; + let chunks = chunks + .into_iter() + .enumerate() + .map(move |(i, chunk)| DuplicateShred { + from: self_pubkey, + wallclock, + slot, + shred_type, + num_chunks, + chunk_index: i as u8, + chunk, + _unused: 0, + }); + Ok(chunks) + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_duplicate_shred_round_trip(merkle_variant: bool) { let mut rng = rand::thread_rng(); let leader = Arc::new(Keypair::new()); let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); let next_shred_index = rng.gen_range(0..32_000); - let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); - let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader); + let shred1 = new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ); + let shred2 = new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ); let leader_schedule = |s| { if s == slot { Some(leader.pubkey()) @@ -356,4 +492,461 @@ pub(crate) mod tests { assert_eq!(shred1, shred3); assert_eq!(shred2, shred4); } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_duplicate_shred_invalid(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..32_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let data_shred = new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ); + let coding_shreds = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); + let test_cases = vec![ + // Same data_shred + (data_shred.clone(), data_shred), + // Same coding_shred + (coding_shreds[0].clone(), coding_shreds[0].clone()), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidDuplicateShreds + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidDuplicateSlotProof + ); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_latest_index_conflict_round_trip(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let test_cases = vec![ + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ]; + for (shred1, shred2) in test_cases.into_iter() { + let chunks: Vec<_> = from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap(); + assert_eq!(shred1, shred3); + assert_eq!(shred2, shred4); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_latest_index_conflict_invalid(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let test_cases = vec![ + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + true, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 1, + &shredder, + &leader, + merkle_variant, + true, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ( + new_rand_data_shred( + &mut rng, + next_shred_index, + &shredder, + &leader, + merkle_variant, + false, + ), + new_rand_data_shred( + &mut rng, + next_shred_index + 100, + &shredder, + &leader, + merkle_variant, + false, + ), + ), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidLastIndexConflict + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidLastIndexConflict + ); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_erasure_meta_conflict_round_trip(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let coding_shreds = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_bigger = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 13, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_smaller = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 7, + &shredder, + &leader, + merkle_variant, + ); + + // Same fec-set, different index, different erasure meta + let test_cases = vec![ + (coding_shreds[0].clone(), coding_shreds_bigger[1].clone()), + (coding_shreds[0].clone(), coding_shreds_smaller[1].clone()), + ]; + for (shred1, shred2) in test_cases.into_iter() { + let chunks: Vec<_> = from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap(); + assert_eq!(shred1, shred3); + assert_eq!(shred2, shred4); + } + } + + #[test_case(true ; "merkle")] + #[test_case(false ; "legacy")] + fn test_erasure_meta_conflict_invalid(merkle_variant: bool) { + let mut rng = rand::thread_rng(); + let leader = Arc::new(Keypair::new()); + let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0); + let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap(); + let next_shred_index = rng.gen_range(0..31_000); + let leader_schedule = |s| { + if s == slot { + Some(leader.pubkey()) + } else { + None + } + }; + let coding_shreds = new_rand_coding_shreds( + &mut rng, + next_shred_index, + 10, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_different_fec = new_rand_coding_shreds( + &mut rng, + next_shred_index + 1, + 10, + &shredder, + &leader, + merkle_variant, + ); + let coding_shreds_different_fec_and_size = new_rand_coding_shreds( + &mut rng, + next_shred_index + 1, + 13, + &shredder, + &leader, + merkle_variant, + ); + + let test_cases = vec![ + // Different index, different fec set, same erasure meta + ( + coding_shreds[0].clone(), + coding_shreds_different_fec[1].clone(), + ), + // Different index, different fec set, different erasure meta + ( + coding_shreds[0].clone(), + coding_shreds_different_fec_and_size[1].clone(), + ), + // Different index, same fec set, same erasure meta + (coding_shreds[0].clone(), coding_shreds[1].clone()), + ( + coding_shreds_different_fec[0].clone(), + coding_shreds_different_fec[1].clone(), + ), + ( + coding_shreds_different_fec_and_size[0].clone(), + coding_shreds_different_fec_and_size[1].clone(), + ), + ]; + for (shred1, shred2) in test_cases.into_iter() { + assert_matches!( + from_shred( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.payload().clone(), + Some(leader_schedule), + rng.gen(), // wallclock + 512, // max_size + ) + .err() + .unwrap(), + Error::InvalidErasureMetaConflict + ); + + let chunks: Vec<_> = from_shred_bypass_checks( + shred1.clone(), + Pubkey::new_unique(), // self_pubkey + shred2.clone(), + rng.gen(), // wallclock + 512, // max_size + ) + .unwrap() + .collect(); + assert!(chunks.len() > 4); + + assert_matches!( + into_shreds(&leader.pubkey(), chunks).err().unwrap(), + Error::InvalidErasureMetaConflict + ); + } + } } diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index 7789404b9..366eef091 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -243,9 +243,6 @@ mod tests { Some(Error::SlotMismatch) => { new_rand_shred(&mut rng, next_shred_index, &shredder1, &my_keypair) } - Some(Error::ShredIndexMismatch) => { - new_rand_shred(&mut rng, next_shred_index + 1, &shredder, &my_keypair) - } Some(Error::InvalidDuplicateShreds) => shred1.clone(), _ => new_rand_shred(&mut rng, next_shred_index, &shredder, &my_keypair), }; @@ -313,7 +310,6 @@ mod tests { for error in [ Error::InvalidSignature, Error::SlotMismatch, - Error::ShredIndexMismatch, Error::InvalidDuplicateShreds, ] { match create_duplicate_proof( diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c5a6c0f45..6660252e4 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -135,9 +135,26 @@ impl std::fmt::Display for InsertDataShredError { } } +#[derive(Eq, PartialEq, Debug, Clone)] +pub enum PossibleDuplicateShred { + Exists(Shred), // Blockstore has another shred in its spot + LastIndexConflict(/* original */ Shred, /* conflict */ Vec), // The index of this shred conflicts with `slot_meta.last_index` + ErasureConflict(/* original */ Shred, /* conflict */ Vec), // The coding shred has a conflict in the erasure_meta +} + +impl PossibleDuplicateShred { + pub fn slot(&self) -> Slot { + match self { + Self::Exists(shred) => shred.slot(), + Self::LastIndexConflict(shred, _) => shred.slot(), + Self::ErasureConflict(shred, _) => shred.slot(), + } + } +} + pub struct InsertResults { completed_data_set_infos: Vec, - duplicate_shreds: Vec, + duplicate_shreds: Vec, } /// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single @@ -1047,7 +1064,7 @@ impl Blockstore { metrics: &mut BlockstoreInsertionMetrics, ) -> Result> where - F: Fn(Shred), + F: Fn(PossibleDuplicateShred), { let InsertResults { completed_data_set_infos, @@ -1165,7 +1182,7 @@ impl Blockstore { write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, index_meta_time_us: &mut u64, - duplicate_shreds: &mut Vec, + duplicate_shreds: &mut Vec, is_trusted: bool, shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, @@ -1184,7 +1201,7 @@ impl Blockstore { if !is_trusted { if index_meta.coding().contains(shred_index) { metrics.num_coding_shreds_exists += 1; - duplicate_shreds.push(shred); + duplicate_shreds.push(PossibleDuplicateShred::Exists(shred)); return false; } @@ -1201,8 +1218,6 @@ impl Blockstore { .unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap()) }); - // TODO: handle_duplicate is not invoked and so duplicate shreds are - // not gossiped to the rest of cluster. if !erasure_meta.check_coding_shred(&shred) { metrics.num_coding_shreds_invalid_erasure_config += 1; let conflicting_shred = self.find_conflicting_coding_shred( @@ -1212,15 +1227,22 @@ impl Blockstore { just_received_shreds, ); if let Some(conflicting_shred) = conflicting_shred { - if self - .store_duplicate_if_not_existing( - slot, + if !self.has_duplicate_shreds_in_slot(slot) { + if self + .store_duplicate_slot( + slot, + conflicting_shred.clone(), + shred.payload().clone(), + ) + .is_err() + { + warn!("bad duplicate store.."); + } + + duplicate_shreds.push(PossibleDuplicateShred::ErasureConflict( + shred.clone(), conflicting_shred, - shred.payload().clone(), - ) - .is_err() - { - warn!("bad duplicate store.."); + )); } } else { datapoint_info!("bad-conflict-shred", ("slot", slot, i64)); @@ -1338,7 +1360,7 @@ impl Blockstore { just_inserted_shreds: &mut HashMap, index_meta_time_us: &mut u64, is_trusted: bool, - duplicate_shreds: &mut Vec, + duplicate_shreds: &mut Vec, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, ) -> std::result::Result, InsertDataShredError> { @@ -1360,7 +1382,7 @@ impl Blockstore { if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { - duplicate_shreds.push(shred); + duplicate_shreds.push(PossibleDuplicateShred::Exists(shred)); return Err(InsertDataShredError::Exists); } @@ -1385,6 +1407,7 @@ impl Blockstore { &self.last_root, leader_schedule, shred_source, + duplicate_shreds, ) { return Err(InsertDataShredError::InvalidShred); } @@ -1466,6 +1489,7 @@ impl Blockstore { last_root: &RwLock, leader_schedule: Option<&LeaderScheduleCache>, shred_source: ShredSource, + duplicate_shreds: &mut Vec, ) -> bool { let shred_index = u64::from(shred.index()); let slot = shred.slot(); @@ -1483,21 +1507,25 @@ impl Blockstore { let leader_pubkey = leader_schedule .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); - let ending_shred: Cow> = self.get_data_shred_from_just_inserted_or_db( - just_inserted_shreds, - slot, - last_index.unwrap(), - ); + if !self.has_duplicate_shreds_in_slot(slot) { + let ending_shred: Vec = self + .get_data_shred_from_just_inserted_or_db( + just_inserted_shreds, + slot, + last_index.unwrap(), + ) + .into_owned(); - if self - .store_duplicate_if_not_existing( - slot, - ending_shred.into_owned(), - shred.payload().clone(), - ) - .is_err() - { - warn!("store duplicate error"); + if self + .store_duplicate_slot(slot, ending_shred.clone(), shred.payload().clone()) + .is_err() + { + warn!("store duplicate error"); + } + duplicate_shreds.push(PossibleDuplicateShred::LastIndexConflict( + shred.clone(), + ending_shred, + )); } datapoint_error!( @@ -1518,21 +1546,25 @@ impl Blockstore { let leader_pubkey = leader_schedule .and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None)); - let ending_shred: Cow> = self.get_data_shred_from_just_inserted_or_db( - just_inserted_shreds, - slot, - slot_meta.received - 1, - ); + if !self.has_duplicate_shreds_in_slot(slot) { + let ending_shred: Vec = self + .get_data_shred_from_just_inserted_or_db( + just_inserted_shreds, + slot, + slot_meta.received - 1, + ) + .into_owned(); - if self - .store_duplicate_if_not_existing( - slot, - ending_shred.into_owned(), - shred.payload().clone(), - ) - .is_err() - { - warn!("store duplicate error"); + if self + .store_duplicate_slot(slot, ending_shred.clone(), shred.payload().clone()) + .is_err() + { + warn!("store duplicate error"); + } + duplicate_shreds.push(PossibleDuplicateShred::LastIndexConflict( + shred.clone(), + ending_shred, + )); } datapoint_error!( @@ -3224,19 +3256,6 @@ impl Blockstore { self.duplicate_slots_cf.delete(slot) } - pub fn store_duplicate_if_not_existing( - &self, - slot: Slot, - shred1: Vec, - shred2: Vec, - ) -> Result<()> { - if !self.has_duplicate_shreds_in_slot(slot) { - self.store_duplicate_slot(slot, shred1, shred2) - } else { - Ok(()) - } - } - pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> { let mut iter = self .db @@ -6571,6 +6590,7 @@ pub mod tests { &last_root, None, ShredSource::Repaired, + &mut Vec::new(), )); // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 @@ -6587,6 +6607,7 @@ pub mod tests { panic!("Shred in unexpected format") } }; + let mut duplicate_shreds = vec![]; assert!(!blockstore.should_insert_data_shred( &shred7, &slot_meta, @@ -6594,8 +6615,15 @@ pub mod tests { &last_root, None, ShredSource::Repaired, + &mut duplicate_shreds, )); assert!(blockstore.has_duplicate_shreds_in_slot(0)); + assert_eq!(duplicate_shreds.len(), 1); + assert_matches!( + duplicate_shreds[0], + PossibleDuplicateShred::LastIndexConflict(_, _) + ); + assert_eq!(duplicate_shreds[0].slot(), 0); // Insert all pending shreds let mut shred8 = shreds[8].clone(); @@ -6604,18 +6632,30 @@ pub mod tests { // Trying to insert a shred with index > the "is_last" shred should fail if shred8.is_data() { - shred8.set_slot(slot_meta.last_index.unwrap() + 1); + shred8.set_index((slot_meta.last_index.unwrap() + 1) as u32); } else { panic!("Shred in unexpected format") } + duplicate_shreds.clear(); + blockstore.duplicate_slots_cf.delete(0).unwrap(); + assert!(!blockstore.has_duplicate_shreds_in_slot(0)); assert!(!blockstore.should_insert_data_shred( - &shred7, + &shred8, &slot_meta, &HashMap::new(), &last_root, None, ShredSource::Repaired, + &mut duplicate_shreds, )); + + assert_eq!(duplicate_shreds.len(), 1); + assert_matches!( + duplicate_shreds[0], + PossibleDuplicateShred::LastIndexConflict(_, _) + ); + assert_eq!(duplicate_shreds[0].slot(), 0); + assert!(blockstore.has_duplicate_shreds_in_slot(0)); } #[test] @@ -6701,7 +6741,10 @@ pub mod tests { ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); - assert_eq!(duplicate_shreds, vec![coding_shred]); + assert_eq!( + duplicate_shreds, + vec![PossibleDuplicateShred::Exists(coding_shred)] + ); } #[test] diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 76cf2b3de..79954ee96 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -352,6 +352,15 @@ impl ErasureMeta { self == &other } + /// Returns true if both shreds are coding shreds and have a + /// consistent erasure config + pub fn check_erasure_consistency(shred1: &Shred, shred2: &Shred) -> bool { + let Some(coding_shred) = Self::from_coding_shred(shred1) else { + return false; + }; + coding_shred.check_coding_shred(shred2) + } + pub(crate) fn config(&self) -> ErasureConfig { self.config }