send duplicate shred proofs for conflicting shred scenarios (#32965)

* send duplicate shred proofs for conflicting shred scenarios

The scenarios are multiple last_shred_in_slot shreds and
coding shreds with conflicting erasure metas.

* Pr feedback: deprecate shred_index for gossip duplicate shred proofs

* Pr feedback

* Remove extraneous dead code
This commit is contained in:
Ashwin Sekar 2023-08-31 12:08:10 -07:00 committed by GitHub
parent f8789c79a2
commit 6d2dae6ab0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 793 additions and 134 deletions

1
Cargo.lock generated
View File

@ -6066,6 +6066,7 @@ dependencies = [
"solana-version",
"solana-vote-program",
"static_assertions",
"test-case",
"thiserror",
]

View File

@ -19,7 +19,7 @@ use {
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{Blockstore, BlockstoreInsertionMetrics},
blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred},
leader_schedule_cache::LeaderScheduleCache,
shred::{self, Nonce, ReedSolomonCache, Shred},
},
@ -138,23 +138,37 @@ impl WindowServiceMetrics {
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
shred_receiver: &Receiver<Shred>,
shred_receiver: &Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: &DuplicateSlotSender,
) -> Result<()> {
let check_duplicate = |shred: Shred| -> Result<()> {
let check_duplicate = |shred: PossibleDuplicateShred| -> Result<()> {
let shred_slot = shred.slot();
if !blockstore.has_duplicate_shreds_in_slot(shred_slot) {
if let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) {
cluster_info.push_duplicate_shred(&shred, &existing_shred_payload)?;
let (shred1, shred2) = match shred {
PossibleDuplicateShred::LastIndexConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::ErasureConflict(shred, conflict) => (shred, conflict),
PossibleDuplicateShred::Exists(shred) => {
// Unlike the other cases we have to wait until here to decide to handle the duplicate and store
// in blockstore. This is because the duplicate could have been part of the same insert batch,
// so we wait until the batch has been written.
if blockstore.has_duplicate_shreds_in_slot(shred_slot) {
return Ok(()); // A duplicate is already recorded
}
let Some(existing_shred_payload) = blockstore.is_shred_duplicate(&shred) else {
return Ok(()); // Not a duplicate
};
blockstore.store_duplicate_slot(
shred_slot,
existing_shred_payload,
shred.into_payload(),
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;
duplicate_slots_sender.send(shred_slot)?;
(shred, existing_shred_payload)
}
}
};
// Propagate duplicate proof through gossip
cluster_info.push_duplicate_shred(&shred1, &shred2)?;
// Notify duplicate consensus state machine
duplicate_slots_sender.send(shred_slot)?;
Ok(())
};
@ -226,7 +240,7 @@ fn run_insert<F>(
reed_solomon_cache: &ReedSolomonCache,
) -> Result<()>
where
F: Fn(Shred),
F: Fn(PossibleDuplicateShred),
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
@ -370,7 +384,7 @@ impl WindowService {
cluster_info: Arc<ClusterInfo>,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
duplicate_receiver: Receiver<Shred>,
duplicate_receiver: Receiver<PossibleDuplicateShred>,
duplicate_slots_sender: DuplicateSlotSender,
) -> JoinHandle<()> {
let handle_error = || {
@ -400,7 +414,7 @@ impl WindowService {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<Shred>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
@ -417,8 +431,8 @@ impl WindowService {
Builder::new()
.name("solWinInsert".to_string())
.spawn(move || {
let handle_duplicate = |shred| {
let _ = check_duplicate_sender.send(shred);
let handle_duplicate = |possible_duplicate_shred| {
let _ = check_duplicate_sender.send(possible_duplicate_shred);
};
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
@ -551,7 +565,9 @@ mod test {
};
assert_eq!(duplicate_shred.slot(), shreds[0].slot());
let duplicate_shred_slot = duplicate_shred.slot();
sender.send(duplicate_shred.clone()).unwrap();
sender
.send(PossibleDuplicateShred::Exists(duplicate_shred.clone()))
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(duplicate_shred_slot));
let keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), timestamp());

View File

@ -54,6 +54,7 @@ thiserror = { workspace = true }
[dev-dependencies]
num_cpus = { workspace = true }
serial_test = { workspace = true }
test-case = { workspace = true }
[build-dependencies]
rustc_version = { workspace = true }

View File

@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")]
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {

View File

@ -149,7 +149,7 @@ impl CrdsGossip {
let now = timestamp();
for entry in entries {
if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred faild: {:?}", err);
error!("push_duplicate_shred failed: {:?}", err);
}
}
Ok(())

View File

@ -3,7 +3,7 @@ use {
itertools::Itertools,
solana_ledger::{
blockstore::BlockstoreError,
blockstore_meta::DuplicateSlotProof,
blockstore_meta::{DuplicateSlotProof, ErasureMeta},
shred::{self, Shred, ShredType},
},
solana_sdk::{
@ -29,7 +29,7 @@ pub struct DuplicateShred {
pub(crate) from: Pubkey,
pub(crate) wallclock: u64,
pub(crate) slot: Slot,
shred_index: u32,
_unused: u32,
shred_type: ShredType,
// Serialized DuplicateSlotProof split into chunks.
num_chunks: u8,
@ -62,6 +62,10 @@ pub enum Error {
InvalidDuplicateShreds,
#[error("invalid duplicate slot proof")]
InvalidDuplicateSlotProof,
#[error("invalid erasure meta conflict")]
InvalidErasureMetaConflict,
#[error("invalid last index conflict")]
InvalidLastIndexConflict,
#[error("invalid signature")]
InvalidSignature,
#[error("invalid size limit")]
@ -74,8 +78,6 @@ pub enum Error {
MissingDataChunk,
#[error("(de)serialization error")]
SerializationError(#[from] bincode::Error),
#[error("shred index mismatch")]
ShredIndexMismatch,
#[error("shred type mismatch")]
ShredTypeMismatch,
#[error("slot mismatch")]
@ -86,33 +88,64 @@ pub enum Error {
UnknownSlotLeader(Slot),
}
// Asserts that the two shreds can indicate duplicate proof for
// the same triplet of (slot, shred-index, and shred-type_), and
// that they have valid signatures from the slot leader.
/// Check that `shred1` and `shred2` indicate a valid duplicate proof
/// - Must be for the same slot
/// - Must have the same `shred_type`
/// - Must both sigverify for the correct leader
/// - If `shred1` and `shred2` share the same index they must be not equal
/// - If `shred1` and `shred2` do not share the same index and are data shreds
/// verify that they indicate an index conflict. One of them must be the
/// LAST_SHRED_IN_SLOT, however the other shred must have a higher index.
/// - If `shred1` and `shred2` do not share the same index and are coding shreds
/// verify that they have conflicting erasure metas
fn check_shreds<F>(leader_schedule: Option<F>, shred1: &Shred, shred2: &Shred) -> Result<(), Error>
where
F: FnOnce(Slot) -> Option<Pubkey>,
{
if shred1.slot() != shred2.slot() {
Err(Error::SlotMismatch)
} else if shred1.index() != shred2.index() {
// TODO: Should also allow two coding shreds with different indices but
// same fec-set-index and mismatching erasure-config.
Err(Error::ShredIndexMismatch)
} else if shred1.shred_type() != shred2.shred_type() {
Err(Error::ShredTypeMismatch)
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else {
if let Some(leader_schedule) = leader_schedule {
let slot_leader =
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
return Err(Error::InvalidSignature);
}
}
Ok(())
return Err(Error::SlotMismatch);
}
if shred1.shred_type() != shred2.shred_type() {
return Err(Error::ShredTypeMismatch);
}
if let Some(leader_schedule) = leader_schedule {
let slot_leader =
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
return Err(Error::InvalidSignature);
}
}
if shred1.index() == shred2.index() {
if shred1.payload() != shred2.payload() {
return Ok(());
}
return Err(Error::InvalidDuplicateShreds);
}
if shred1.shred_type() == ShredType::Data {
if shred1.last_in_slot() && shred2.index() > shred1.index() {
return Ok(());
}
if shred2.last_in_slot() && shred1.index() > shred2.index() {
return Ok(());
}
return Err(Error::InvalidLastIndexConflict);
}
// This mirrors the current logic in blockstore to detect coding shreds with conflicting
// erasure sets. However this is not technically exhaustive, as any 2 shreds with
// different but overlapping erasure sets can be considered duplicate and need not be
// a part of the same fec set. Further work to enhance detection is planned in
// https://github.com/solana-labs/solana/issues/33037
if shred1.fec_set_index() == shred2.fec_set_index()
&& !ErasureMeta::check_erasure_consistency(shred1, shred2)
{
return Ok(());
}
Err(Error::InvalidErasureMetaConflict)
}
pub(crate) fn from_shred<F>(
@ -131,7 +164,7 @@ where
}
let other_shred = Shred::new_from_serialized_shred(other_payload)?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type());
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
@ -151,28 +184,25 @@ where
from: self_pubkey,
wallclock,
slot,
shred_index,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
});
Ok(chunks)
}
// Returns a predicate checking if a duplicate-shred chunk matches
// (slot, shred_index, shred_type) and has valid chunk_index.
// (slot, shred_type) and has valid chunk_index.
fn check_chunk(
slot: Slot,
shred_index: u32,
shred_type: ShredType,
num_chunks: u8,
) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
move |dup| {
if dup.slot != slot {
Err(Error::SlotMismatch)
} else if dup.shred_index != shred_index {
Err(Error::ShredIndexMismatch)
} else if dup.shred_type != shred_type {
Err(Error::ShredTypeMismatch)
} else if dup.num_chunks != num_chunks {
@ -196,14 +226,13 @@ pub(crate) fn into_shreds(
let mut chunks = chunks.into_iter();
let DuplicateShred {
slot,
shred_index,
shred_type,
num_chunks,
chunk_index,
chunk,
..
} = chunks.next().ok_or(Error::InvalidDuplicateShreds)?;
let check_chunk = check_chunk(slot, shred_index, shred_type, num_chunks);
let check_chunk = check_chunk(slot, shred_type, num_chunks);
let mut data = HashMap::new();
data.insert(chunk_index, chunk);
for chunk in chunks {
@ -231,15 +260,10 @@ pub(crate) fn into_shreds(
let shred2 = Shred::new_from_serialized_shred(proof.shred2)?;
if shred1.slot() != slot || shred2.slot() != slot {
Err(Error::SlotMismatch)
} else if shred1.index() != shred_index || shred2.index() != shred_index {
Err(Error::ShredIndexMismatch)
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else if !shred1.verify(slot_leader) || !shred2.verify(slot_leader) {
Err(Error::InvalidSignature)
} else {
check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?;
Ok((shred1, shred2))
}
}
@ -267,6 +291,7 @@ pub(crate) mod tests {
system_transaction,
},
std::sync::Arc,
test_case::test_case,
};
#[test]
@ -275,11 +300,11 @@ pub(crate) mod tests {
from: Pubkey::new_unique(),
wallclock: u64::MAX,
slot: Slot::MAX,
shred_index: u32::MAX,
shred_type: ShredType::Data,
num_chunks: u8::MAX,
chunk_index: u8::MAX,
chunk: Vec::default(),
_unused: 0,
};
assert_eq!(
bincode::serialize(&dup).unwrap().len(),
@ -297,6 +322,71 @@ pub(crate) mod tests {
shredder: &Shredder,
keypair: &Keypair,
) -> Shred {
let (mut data_shreds, _) = new_rand_shreds(
rng,
next_shred_index,
next_shred_index,
5,
true,
shredder,
keypair,
true,
);
data_shreds.pop().unwrap()
}
fn new_rand_data_shred<R: Rng>(
rng: &mut R,
next_shred_index: u32,
shredder: &Shredder,
keypair: &Keypair,
merkle_variant: bool,
is_last_in_slot: bool,
) -> Shred {
let (mut data_shreds, _) = new_rand_shreds(
rng,
next_shred_index,
next_shred_index,
5,
merkle_variant,
shredder,
keypair,
is_last_in_slot,
);
data_shreds.pop().unwrap()
}
fn new_rand_coding_shreds<R: Rng>(
rng: &mut R,
next_shred_index: u32,
num_entries: usize,
shredder: &Shredder,
keypair: &Keypair,
merkle_variant: bool,
) -> Vec<Shred> {
let (_, coding_shreds) = new_rand_shreds(
rng,
next_shred_index,
next_shred_index,
num_entries,
merkle_variant,
shredder,
keypair,
true,
);
coding_shreds
}
fn new_rand_shreds<R: Rng>(
rng: &mut R,
next_shred_index: u32,
next_code_index: u32,
num_entries: usize,
merkle_variant: bool,
shredder: &Shredder,
keypair: &Keypair,
is_last_in_slot: bool,
) -> (Vec<Shred>, Vec<Shred>) {
let entries: Vec<_> = std::iter::repeat_with(|| {
let tx = system_transaction::transfer(
&Keypair::new(), // from
@ -310,30 +400,76 @@ pub(crate) mod tests {
vec![tx], // transactions
)
})
.take(5)
.take(num_entries)
.collect();
let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds(
shredder.entries_to_shreds(
keypair,
&entries,
true, // is_last_in_slot
is_last_in_slot,
next_shred_index,
next_shred_index, // next_code_index
true, // merkle_variant
next_code_index, // next_code_index
merkle_variant,
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
data_shreds.swap_remove(0)
)
}
#[test]
fn test_duplicate_shred_round_trip() {
fn from_shred_bypass_checks(
shred: Shred,
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
other_shred: Shred,
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
};
let data = bincode::serialize(&proof)?;
let chunk_size = max_size - DUPLICATE_SHRED_HEADER_SIZE;
let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect();
let num_chunks = u8::try_from(chunks.len())?;
let chunks = chunks
.into_iter()
.enumerate()
.map(move |(i, chunk)| DuplicateShred {
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
});
Ok(chunks)
}
#[test_case(true ; "merkle")]
#[test_case(false ; "legacy")]
fn test_duplicate_shred_round_trip(merkle_variant: bool) {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..32_000);
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
let shred1 = new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
);
let shred2 = new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
@ -356,4 +492,461 @@ pub(crate) mod tests {
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
#[test_case(true ; "merkle")]
#[test_case(false ; "legacy")]
fn test_duplicate_shred_invalid(merkle_variant: bool) {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..32_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let data_shred = new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
);
let coding_shreds = new_rand_coding_shreds(
&mut rng,
next_shred_index,
10,
&shredder,
&leader,
merkle_variant,
);
let test_cases = vec![
// Same data_shred
(data_shred.clone(), data_shred),
// Same coding_shred
(coding_shreds[0].clone(), coding_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
assert_matches!(
from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.err()
.unwrap(),
Error::InvalidDuplicateShreds
);
let chunks: Vec<_> = from_shred_bypass_checks(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.clone(),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
assert_matches!(
into_shreds(&leader.pubkey(), chunks).err().unwrap(),
Error::InvalidDuplicateSlotProof
);
}
}
#[test_case(true ; "merkle")]
#[test_case(false ; "legacy")]
fn test_latest_index_conflict_round_trip(merkle_variant: bool) {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let test_cases = vec![
(
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
&shredder,
&leader,
merkle_variant,
false,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
&shredder,
&leader,
merkle_variant,
false,
),
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index + 100,
&shredder,
&leader,
merkle_variant,
true,
),
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
true,
),
new_rand_data_shred(
&mut rng,
next_shred_index + 100,
&shredder,
&leader,
merkle_variant,
true,
),
),
];
for (shred1, shred2) in test_cases.into_iter() {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
}
#[test_case(true ; "merkle")]
#[test_case(false ; "legacy")]
fn test_latest_index_conflict_invalid(merkle_variant: bool) {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let test_cases = vec![
(
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
false,
),
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
&shredder,
&leader,
merkle_variant,
true,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index + 1,
&shredder,
&leader,
merkle_variant,
true,
),
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
false,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index + 100,
&shredder,
&leader,
merkle_variant,
false,
),
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
false,
),
),
(
new_rand_data_shred(
&mut rng,
next_shred_index,
&shredder,
&leader,
merkle_variant,
false,
),
new_rand_data_shred(
&mut rng,
next_shred_index + 100,
&shredder,
&leader,
merkle_variant,
false,
),
),
];
for (shred1, shred2) in test_cases.into_iter() {
assert_matches!(
from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.err()
.unwrap(),
Error::InvalidLastIndexConflict
);
let chunks: Vec<_> = from_shred_bypass_checks(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.clone(),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
assert_matches!(
into_shreds(&leader.pubkey(), chunks).err().unwrap(),
Error::InvalidLastIndexConflict
);
}
}
#[test_case(true ; "merkle")]
#[test_case(false ; "legacy")]
fn test_erasure_meta_conflict_round_trip(merkle_variant: bool) {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let coding_shreds = new_rand_coding_shreds(
&mut rng,
next_shred_index,
10,
&shredder,
&leader,
merkle_variant,
);
let coding_shreds_bigger = new_rand_coding_shreds(
&mut rng,
next_shred_index,
13,
&shredder,
&leader,
merkle_variant,
);
let coding_shreds_smaller = new_rand_coding_shreds(
&mut rng,
next_shred_index,
7,
&shredder,
&leader,
merkle_variant,
);
// Same fec-set, different index, different erasure meta
let test_cases = vec![
(coding_shreds[0].clone(), coding_shreds_bigger[1].clone()),
(coding_shreds[0].clone(), coding_shreds_smaller[1].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
}
#[test_case(true ; "merkle")]
#[test_case(false ; "legacy")]
fn test_erasure_meta_conflict_invalid(merkle_variant: bool) {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};
let coding_shreds = new_rand_coding_shreds(
&mut rng,
next_shred_index,
10,
&shredder,
&leader,
merkle_variant,
);
let coding_shreds_different_fec = new_rand_coding_shreds(
&mut rng,
next_shred_index + 1,
10,
&shredder,
&leader,
merkle_variant,
);
let coding_shreds_different_fec_and_size = new_rand_coding_shreds(
&mut rng,
next_shred_index + 1,
13,
&shredder,
&leader,
merkle_variant,
);
let test_cases = vec![
// Different index, different fec set, same erasure meta
(
coding_shreds[0].clone(),
coding_shreds_different_fec[1].clone(),
),
// Different index, different fec set, different erasure meta
(
coding_shreds[0].clone(),
coding_shreds_different_fec_and_size[1].clone(),
),
// Different index, same fec set, same erasure meta
(coding_shreds[0].clone(), coding_shreds[1].clone()),
(
coding_shreds_different_fec[0].clone(),
coding_shreds_different_fec[1].clone(),
),
(
coding_shreds_different_fec_and_size[0].clone(),
coding_shreds_different_fec_and_size[1].clone(),
),
];
for (shred1, shred2) in test_cases.into_iter() {
assert_matches!(
from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.err()
.unwrap(),
Error::InvalidErasureMetaConflict
);
let chunks: Vec<_> = from_shred_bypass_checks(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.clone(),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
assert_matches!(
into_shreds(&leader.pubkey(), chunks).err().unwrap(),
Error::InvalidErasureMetaConflict
);
}
}
}

View File

@ -243,9 +243,6 @@ mod tests {
Some(Error::SlotMismatch) => {
new_rand_shred(&mut rng, next_shred_index, &shredder1, &my_keypair)
}
Some(Error::ShredIndexMismatch) => {
new_rand_shred(&mut rng, next_shred_index + 1, &shredder, &my_keypair)
}
Some(Error::InvalidDuplicateShreds) => shred1.clone(),
_ => new_rand_shred(&mut rng, next_shred_index, &shredder, &my_keypair),
};
@ -313,7 +310,6 @@ mod tests {
for error in [
Error::InvalidSignature,
Error::SlotMismatch,
Error::ShredIndexMismatch,
Error::InvalidDuplicateShreds,
] {
match create_duplicate_proof(

View File

@ -135,9 +135,26 @@ impl std::fmt::Display for InsertDataShredError {
}
}
#[derive(Eq, PartialEq, Debug, Clone)]
pub enum PossibleDuplicateShred {
Exists(Shred), // Blockstore has another shred in its spot
LastIndexConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The index of this shred conflicts with `slot_meta.last_index`
ErasureConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The coding shred has a conflict in the erasure_meta
}
impl PossibleDuplicateShred {
pub fn slot(&self) -> Slot {
match self {
Self::Exists(shred) => shred.slot(),
Self::LastIndexConflict(shred, _) => shred.slot(),
Self::ErasureConflict(shred, _) => shred.slot(),
}
}
}
pub struct InsertResults {
completed_data_set_infos: Vec<CompletedDataSetInfo>,
duplicate_shreds: Vec<Shred>,
duplicate_shreds: Vec<PossibleDuplicateShred>,
}
/// A "complete data set" is a range of [`Shred`]s that combined in sequence carry a single
@ -1047,7 +1064,7 @@ impl Blockstore {
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<Vec<CompletedDataSetInfo>>
where
F: Fn(Shred),
F: Fn(PossibleDuplicateShred),
{
let InsertResults {
completed_data_set_infos,
@ -1165,7 +1182,7 @@ impl Blockstore {
write_batch: &mut WriteBatch,
just_received_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time_us: &mut u64,
duplicate_shreds: &mut Vec<Shred>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
is_trusted: bool,
shred_source: ShredSource,
metrics: &mut BlockstoreInsertionMetrics,
@ -1184,7 +1201,7 @@ impl Blockstore {
if !is_trusted {
if index_meta.coding().contains(shred_index) {
metrics.num_coding_shreds_exists += 1;
duplicate_shreds.push(shred);
duplicate_shreds.push(PossibleDuplicateShred::Exists(shred));
return false;
}
@ -1201,8 +1218,6 @@ impl Blockstore {
.unwrap_or_else(|| ErasureMeta::from_coding_shred(&shred).unwrap())
});
// TODO: handle_duplicate is not invoked and so duplicate shreds are
// not gossiped to the rest of cluster.
if !erasure_meta.check_coding_shred(&shred) {
metrics.num_coding_shreds_invalid_erasure_config += 1;
let conflicting_shred = self.find_conflicting_coding_shred(
@ -1212,15 +1227,22 @@ impl Blockstore {
just_received_shreds,
);
if let Some(conflicting_shred) = conflicting_shred {
if self
.store_duplicate_if_not_existing(
slot,
if !self.has_duplicate_shreds_in_slot(slot) {
if self
.store_duplicate_slot(
slot,
conflicting_shred.clone(),
shred.payload().clone(),
)
.is_err()
{
warn!("bad duplicate store..");
}
duplicate_shreds.push(PossibleDuplicateShred::ErasureConflict(
shred.clone(),
conflicting_shred,
shred.payload().clone(),
)
.is_err()
{
warn!("bad duplicate store..");
));
}
} else {
datapoint_info!("bad-conflict-shred", ("slot", slot, i64));
@ -1338,7 +1360,7 @@ impl Blockstore {
just_inserted_shreds: &mut HashMap<ShredId, Shred>,
index_meta_time_us: &mut u64,
is_trusted: bool,
duplicate_shreds: &mut Vec<Shred>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
) -> std::result::Result<Vec<CompletedDataSetInfo>, InsertDataShredError> {
@ -1360,7 +1382,7 @@ impl Blockstore {
if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) {
duplicate_shreds.push(shred);
duplicate_shreds.push(PossibleDuplicateShred::Exists(shred));
return Err(InsertDataShredError::Exists);
}
@ -1385,6 +1407,7 @@ impl Blockstore {
&self.last_root,
leader_schedule,
shred_source,
duplicate_shreds,
) {
return Err(InsertDataShredError::InvalidShred);
}
@ -1466,6 +1489,7 @@ impl Blockstore {
last_root: &RwLock<u64>,
leader_schedule: Option<&LeaderScheduleCache>,
shred_source: ShredSource,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
let shred_index = u64::from(shred.index());
let slot = shred.slot();
@ -1483,21 +1507,25 @@ impl Blockstore {
let leader_pubkey = leader_schedule
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
just_inserted_shreds,
slot,
last_index.unwrap(),
);
if !self.has_duplicate_shreds_in_slot(slot) {
let ending_shred: Vec<u8> = self
.get_data_shred_from_just_inserted_or_db(
just_inserted_shreds,
slot,
last_index.unwrap(),
)
.into_owned();
if self
.store_duplicate_if_not_existing(
slot,
ending_shred.into_owned(),
shred.payload().clone(),
)
.is_err()
{
warn!("store duplicate error");
if self
.store_duplicate_slot(slot, ending_shred.clone(), shred.payload().clone())
.is_err()
{
warn!("store duplicate error");
}
duplicate_shreds.push(PossibleDuplicateShred::LastIndexConflict(
shred.clone(),
ending_shred,
));
}
datapoint_error!(
@ -1518,21 +1546,25 @@ impl Blockstore {
let leader_pubkey = leader_schedule
.and_then(|leader_schedule| leader_schedule.slot_leader_at(slot, None));
let ending_shred: Cow<Vec<u8>> = self.get_data_shred_from_just_inserted_or_db(
just_inserted_shreds,
slot,
slot_meta.received - 1,
);
if !self.has_duplicate_shreds_in_slot(slot) {
let ending_shred: Vec<u8> = self
.get_data_shred_from_just_inserted_or_db(
just_inserted_shreds,
slot,
slot_meta.received - 1,
)
.into_owned();
if self
.store_duplicate_if_not_existing(
slot,
ending_shred.into_owned(),
shred.payload().clone(),
)
.is_err()
{
warn!("store duplicate error");
if self
.store_duplicate_slot(slot, ending_shred.clone(), shred.payload().clone())
.is_err()
{
warn!("store duplicate error");
}
duplicate_shreds.push(PossibleDuplicateShred::LastIndexConflict(
shred.clone(),
ending_shred,
));
}
datapoint_error!(
@ -3224,19 +3256,6 @@ impl Blockstore {
self.duplicate_slots_cf.delete(slot)
}
pub fn store_duplicate_if_not_existing(
&self,
slot: Slot,
shred1: Vec<u8>,
shred2: Vec<u8>,
) -> Result<()> {
if !self.has_duplicate_shreds_in_slot(slot) {
self.store_duplicate_slot(slot, shred1, shred2)
} else {
Ok(())
}
}
pub fn get_first_duplicate_proof(&self) -> Option<(Slot, DuplicateSlotProof)> {
let mut iter = self
.db
@ -6571,6 +6590,7 @@ pub mod tests {
&last_root,
None,
ShredSource::Repaired,
&mut Vec::new(),
));
// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
@ -6587,6 +6607,7 @@ pub mod tests {
panic!("Shred in unexpected format")
}
};
let mut duplicate_shreds = vec![];
assert!(!blockstore.should_insert_data_shred(
&shred7,
&slot_meta,
@ -6594,8 +6615,15 @@ pub mod tests {
&last_root,
None,
ShredSource::Repaired,
&mut duplicate_shreds,
));
assert!(blockstore.has_duplicate_shreds_in_slot(0));
assert_eq!(duplicate_shreds.len(), 1);
assert_matches!(
duplicate_shreds[0],
PossibleDuplicateShred::LastIndexConflict(_, _)
);
assert_eq!(duplicate_shreds[0].slot(), 0);
// Insert all pending shreds
let mut shred8 = shreds[8].clone();
@ -6604,18 +6632,30 @@ pub mod tests {
// Trying to insert a shred with index > the "is_last" shred should fail
if shred8.is_data() {
shred8.set_slot(slot_meta.last_index.unwrap() + 1);
shred8.set_index((slot_meta.last_index.unwrap() + 1) as u32);
} else {
panic!("Shred in unexpected format")
}
duplicate_shreds.clear();
blockstore.duplicate_slots_cf.delete(0).unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(0));
assert!(!blockstore.should_insert_data_shred(
&shred7,
&shred8,
&slot_meta,
&HashMap::new(),
&last_root,
None,
ShredSource::Repaired,
&mut duplicate_shreds,
));
assert_eq!(duplicate_shreds.len(), 1);
assert_matches!(
duplicate_shreds[0],
PossibleDuplicateShred::LastIndexConflict(_, _)
);
assert_eq!(duplicate_shreds[0].slot(), 0);
assert!(blockstore.has_duplicate_shreds_in_slot(0));
}
#[test]
@ -6701,7 +6741,10 @@ pub mod tests {
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
assert_eq!(duplicate_shreds, vec![coding_shred]);
assert_eq!(
duplicate_shreds,
vec![PossibleDuplicateShred::Exists(coding_shred)]
);
}
#[test]

View File

@ -352,6 +352,15 @@ impl ErasureMeta {
self == &other
}
/// Returns true if both shreds are coding shreds and have a
/// consistent erasure config
pub fn check_erasure_consistency(shred1: &Shred, shred2: &Shred) -> bool {
let Some(coding_shred) = Self::from_coding_shred(shred1) else {
return false;
};
coding_shred.check_coding_shred(shred2)
}
pub(crate) fn config(&self) -> ErasureConfig {
self.config
}