360 lines
12 KiB
Rust
360 lines
12 KiB
Rust
use {
|
|
crate::crds_value::sanitize_wallclock,
|
|
itertools::Itertools,
|
|
solana_ledger::{
|
|
blockstore::BlockstoreError,
|
|
blockstore_meta::DuplicateSlotProof,
|
|
shred::{self, Shred, ShredType},
|
|
},
|
|
solana_sdk::{
|
|
clock::Slot,
|
|
pubkey::Pubkey,
|
|
sanitize::{Sanitize, SanitizeError},
|
|
},
|
|
std::{
|
|
collections::{hash_map::Entry, HashMap},
|
|
convert::TryFrom,
|
|
num::TryFromIntError,
|
|
},
|
|
thiserror::Error,
|
|
};
|
|
|
|
const DUPLICATE_SHRED_HEADER_SIZE: usize = 63;
|
|
|
|
pub(crate) type DuplicateShredIndex = u16;
|
|
pub(crate) const MAX_DUPLICATE_SHREDS: DuplicateShredIndex = 512;
|
|
|
|
#[derive(Clone, Debug, PartialEq, Eq, AbiExample, Deserialize, Serialize)]
|
|
pub struct DuplicateShred {
|
|
pub(crate) from: Pubkey,
|
|
pub(crate) wallclock: u64,
|
|
pub(crate) slot: Slot,
|
|
shred_index: u32,
|
|
shred_type: ShredType,
|
|
// Serialized DuplicateSlotProof split into chunks.
|
|
num_chunks: u8,
|
|
chunk_index: u8,
|
|
#[serde(with = "serde_bytes")]
|
|
chunk: Vec<u8>,
|
|
}
|
|
|
|
impl DuplicateShred {
|
|
#[inline]
|
|
pub(crate) fn num_chunks(&self) -> u8 {
|
|
self.num_chunks
|
|
}
|
|
|
|
#[inline]
|
|
pub(crate) fn chunk_index(&self) -> u8 {
|
|
self.chunk_index
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Error)]
|
|
pub enum Error {
|
|
#[error("block store save error")]
|
|
BlockstoreInsertFailed(#[from] BlockstoreError),
|
|
#[error("data chunk mismatch")]
|
|
DataChunkMismatch,
|
|
#[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
|
|
InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
|
|
#[error("invalid duplicate shreds")]
|
|
InvalidDuplicateShreds,
|
|
#[error("invalid duplicate slot proof")]
|
|
InvalidDuplicateSlotProof,
|
|
#[error("invalid signature")]
|
|
InvalidSignature,
|
|
#[error("invalid size limit")]
|
|
InvalidSizeLimit,
|
|
#[error(transparent)]
|
|
InvalidShred(#[from] shred::Error),
|
|
#[error("number of chunks mismatch")]
|
|
NumChunksMismatch,
|
|
#[error("missing data chunk")]
|
|
MissingDataChunk,
|
|
#[error("(de)serialization error")]
|
|
SerializationError(#[from] bincode::Error),
|
|
#[error("shred index mismatch")]
|
|
ShredIndexMismatch,
|
|
#[error("shred type mismatch")]
|
|
ShredTypeMismatch,
|
|
#[error("slot mismatch")]
|
|
SlotMismatch,
|
|
#[error("type conversion error")]
|
|
TryFromIntError(#[from] TryFromIntError),
|
|
#[error("unknown slot leader: {0}")]
|
|
UnknownSlotLeader(Slot),
|
|
}
|
|
|
|
// Asserts that the two shreds can indicate duplicate proof for
|
|
// the same triplet of (slot, shred-index, and shred-type_), and
|
|
// that they have valid signatures from the slot leader.
|
|
fn check_shreds<F>(leader_schedule: Option<F>, shred1: &Shred, shred2: &Shred) -> Result<(), Error>
|
|
where
|
|
F: FnOnce(Slot) -> Option<Pubkey>,
|
|
{
|
|
if shred1.slot() != shred2.slot() {
|
|
Err(Error::SlotMismatch)
|
|
} else if shred1.index() != shred2.index() {
|
|
// TODO: Should also allow two coding shreds with different indices but
|
|
// same fec-set-index and mismatching erasure-config.
|
|
Err(Error::ShredIndexMismatch)
|
|
} else if shred1.shred_type() != shred2.shred_type() {
|
|
Err(Error::ShredTypeMismatch)
|
|
} else if shred1.payload() == shred2.payload() {
|
|
Err(Error::InvalidDuplicateShreds)
|
|
} else {
|
|
if let Some(leader_schedule) = leader_schedule {
|
|
let slot_leader =
|
|
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
|
|
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
|
|
return Err(Error::InvalidSignature);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub(crate) fn from_shred<F>(
|
|
shred: Shred,
|
|
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
|
|
other_payload: Vec<u8>,
|
|
leader_schedule: Option<F>,
|
|
wallclock: u64,
|
|
max_size: usize, // Maximum serialized size of each DuplicateShred.
|
|
) -> Result<impl Iterator<Item = DuplicateShred>, Error>
|
|
where
|
|
F: FnOnce(Slot) -> Option<Pubkey>,
|
|
{
|
|
if shred.payload() == &other_payload {
|
|
return Err(Error::InvalidDuplicateShreds);
|
|
}
|
|
let other_shred = Shred::new_from_serialized_shred(other_payload.clone())?;
|
|
check_shreds(leader_schedule, &shred, &other_shred)?;
|
|
let (slot, shred_index, shred_type) = (shred.slot(), shred.index(), shred.shred_type());
|
|
let proof = DuplicateSlotProof {
|
|
shred1: shred.into_payload(),
|
|
shred2: other_payload,
|
|
};
|
|
let data = bincode::serialize(&proof)?;
|
|
let chunk_size = if DUPLICATE_SHRED_HEADER_SIZE < max_size {
|
|
max_size - DUPLICATE_SHRED_HEADER_SIZE
|
|
} else {
|
|
return Err(Error::InvalidSizeLimit);
|
|
};
|
|
let chunks: Vec<_> = data.chunks(chunk_size).map(Vec::from).collect();
|
|
let num_chunks = u8::try_from(chunks.len())?;
|
|
let chunks = chunks
|
|
.into_iter()
|
|
.enumerate()
|
|
.map(move |(i, chunk)| DuplicateShred {
|
|
from: self_pubkey,
|
|
wallclock,
|
|
slot,
|
|
shred_index,
|
|
shred_type,
|
|
num_chunks,
|
|
chunk_index: i as u8,
|
|
chunk,
|
|
});
|
|
Ok(chunks)
|
|
}
|
|
|
|
// Returns a predicate checking if a duplicate-shred chunk matches
|
|
// (slot, shred_index, shred_type) and has valid chunk_index.
|
|
fn check_chunk(
|
|
slot: Slot,
|
|
shred_index: u32,
|
|
shred_type: ShredType,
|
|
num_chunks: u8,
|
|
) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
|
|
move |dup| {
|
|
if dup.slot != slot {
|
|
Err(Error::SlotMismatch)
|
|
} else if dup.shred_index != shred_index {
|
|
Err(Error::ShredIndexMismatch)
|
|
} else if dup.shred_type != shred_type {
|
|
Err(Error::ShredTypeMismatch)
|
|
} else if dup.num_chunks != num_chunks {
|
|
Err(Error::NumChunksMismatch)
|
|
} else if dup.chunk_index >= num_chunks {
|
|
Err(Error::InvalidChunkIndex {
|
|
chunk_index: dup.chunk_index,
|
|
num_chunks,
|
|
})
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Reconstructs the duplicate shreds from chunks of DuplicateShred.
|
|
pub(crate) fn into_shreds(
|
|
slot_leader: &Pubkey,
|
|
chunks: impl IntoIterator<Item = DuplicateShred>,
|
|
) -> Result<(Shred, Shred), Error> {
|
|
let mut chunks = chunks.into_iter();
|
|
let DuplicateShred {
|
|
slot,
|
|
shred_index,
|
|
shred_type,
|
|
num_chunks,
|
|
chunk_index,
|
|
chunk,
|
|
..
|
|
} = chunks.next().ok_or(Error::InvalidDuplicateShreds)?;
|
|
let check_chunk = check_chunk(slot, shred_index, shred_type, num_chunks);
|
|
let mut data = HashMap::new();
|
|
data.insert(chunk_index, chunk);
|
|
for chunk in chunks {
|
|
check_chunk(&chunk)?;
|
|
match data.entry(chunk.chunk_index) {
|
|
Entry::Vacant(entry) => {
|
|
entry.insert(chunk.chunk);
|
|
}
|
|
Entry::Occupied(entry) => {
|
|
if *entry.get() != chunk.chunk {
|
|
return Err(Error::DataChunkMismatch);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if data.len() != num_chunks as usize {
|
|
return Err(Error::MissingDataChunk);
|
|
}
|
|
let data = (0..num_chunks).map(|k| data.remove(&k).unwrap()).concat();
|
|
let proof: DuplicateSlotProof = bincode::deserialize(&data)?;
|
|
if proof.shred1 == proof.shred2 {
|
|
return Err(Error::InvalidDuplicateSlotProof);
|
|
}
|
|
let shred1 = Shred::new_from_serialized_shred(proof.shred1)?;
|
|
let shred2 = Shred::new_from_serialized_shred(proof.shred2)?;
|
|
if shred1.slot() != slot || shred2.slot() != slot {
|
|
Err(Error::SlotMismatch)
|
|
} else if shred1.index() != shred_index || shred2.index() != shred_index {
|
|
Err(Error::ShredIndexMismatch)
|
|
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
|
|
Err(Error::ShredTypeMismatch)
|
|
} else if shred1.payload() == shred2.payload() {
|
|
Err(Error::InvalidDuplicateShreds)
|
|
} else if !shred1.verify(slot_leader) || !shred2.verify(slot_leader) {
|
|
Err(Error::InvalidSignature)
|
|
} else {
|
|
Ok((shred1, shred2))
|
|
}
|
|
}
|
|
|
|
impl Sanitize for DuplicateShred {
|
|
fn sanitize(&self) -> Result<(), SanitizeError> {
|
|
sanitize_wallclock(self.wallclock)?;
|
|
if self.chunk_index >= self.num_chunks {
|
|
return Err(SanitizeError::IndexOutOfBounds);
|
|
}
|
|
self.from.sanitize()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
pub(crate) mod tests {
|
|
use {
|
|
super::*,
|
|
rand::Rng,
|
|
solana_entry::entry::Entry,
|
|
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
|
|
solana_sdk::{
|
|
hash,
|
|
signature::{Keypair, Signer},
|
|
system_transaction,
|
|
},
|
|
std::sync::Arc,
|
|
};
|
|
|
|
#[test]
|
|
fn test_duplicate_shred_header_size() {
|
|
let dup = DuplicateShred {
|
|
from: Pubkey::new_unique(),
|
|
wallclock: u64::MAX,
|
|
slot: Slot::MAX,
|
|
shred_index: u32::MAX,
|
|
shred_type: ShredType::Data,
|
|
num_chunks: u8::MAX,
|
|
chunk_index: u8::MAX,
|
|
chunk: Vec::default(),
|
|
};
|
|
assert_eq!(
|
|
bincode::serialize(&dup).unwrap().len(),
|
|
DUPLICATE_SHRED_HEADER_SIZE
|
|
);
|
|
assert_eq!(
|
|
bincode::serialized_size(&dup).unwrap(),
|
|
DUPLICATE_SHRED_HEADER_SIZE as u64
|
|
);
|
|
}
|
|
|
|
pub(crate) fn new_rand_shred<R: Rng>(
|
|
rng: &mut R,
|
|
next_shred_index: u32,
|
|
shredder: &Shredder,
|
|
keypair: &Keypair,
|
|
) -> Shred {
|
|
let entries: Vec<_> = std::iter::repeat_with(|| {
|
|
let tx = system_transaction::transfer(
|
|
&Keypair::new(), // from
|
|
&Pubkey::new_unique(), // to
|
|
rng.gen(), // lamports
|
|
hash::new_rand(rng), // recent blockhash
|
|
);
|
|
Entry::new(
|
|
&hash::new_rand(rng), // prev_hash
|
|
1, // num_hashes,
|
|
vec![tx], // transactions
|
|
)
|
|
})
|
|
.take(5)
|
|
.collect();
|
|
let (mut data_shreds, _coding_shreds) = shredder.entries_to_shreds(
|
|
keypair,
|
|
&entries,
|
|
true, // is_last_in_slot
|
|
next_shred_index,
|
|
next_shred_index, // next_code_index
|
|
true, // merkle_variant
|
|
&ReedSolomonCache::default(),
|
|
&mut ProcessShredsStats::default(),
|
|
);
|
|
data_shreds.swap_remove(0)
|
|
}
|
|
|
|
#[test]
|
|
fn test_duplicate_shred_round_trip() {
|
|
let mut rng = rand::thread_rng();
|
|
let leader = Arc::new(Keypair::new());
|
|
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
|
|
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
|
|
let next_shred_index = rng.gen_range(0, 32_000);
|
|
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
|
|
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
|
|
let leader_schedule = |s| {
|
|
if s == slot {
|
|
Some(leader.pubkey())
|
|
} else {
|
|
None
|
|
}
|
|
};
|
|
let chunks: Vec<_> = from_shred(
|
|
shred1.clone(),
|
|
Pubkey::new_unique(), // self_pubkey
|
|
shred2.payload().clone(),
|
|
Some(leader_schedule),
|
|
rng.gen(), // wallclock
|
|
512, // max_size
|
|
)
|
|
.unwrap()
|
|
.collect();
|
|
assert!(chunks.len() > 4);
|
|
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
|
|
assert_eq!(shred1, shred3);
|
|
assert_eq!(shred2, shred4);
|
|
}
|
|
}
|