Blob verify (#4951)
* Ensure signable data is not out of range * Add a broadcast stage that puts bad sizes in blobs * Resign blob after modifyign size * Remove assertions that fail when size != meta.size
This commit is contained in:
parent
417e8d5064
commit
22ef3c7c54
|
@ -463,7 +463,7 @@ impl BankingStage {
|
||||||
// we just attempted to record) as retryable
|
// we just attempted to record) as retryable
|
||||||
return (res, processed_transactions_indexes);
|
return (res, processed_transactions_indexes);
|
||||||
}
|
}
|
||||||
Err(_) => panic!("Poh recorder returned unexpected error"),
|
Err(e) => panic!(format!("Poh recorder returned unexpected error: {:?}", e)),
|
||||||
}
|
}
|
||||||
poh_record.stop();
|
poh_record.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1490,8 +1490,6 @@ fn recover(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_ne!(size, 0);
|
|
||||||
|
|
||||||
for i in start_idx..data_end_idx {
|
for i in start_idx..data_end_idx {
|
||||||
let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize;
|
let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize;
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ pub struct ErasureMeta {
|
||||||
/// Which erasure set in the slot this is
|
/// Which erasure set in the slot this is
|
||||||
pub set_index: u64,
|
pub set_index: u64,
|
||||||
/// Size of shards in this erasure set
|
/// Size of shards in this erasure set
|
||||||
pub size: usize,
|
size: usize,
|
||||||
/// Bitfield representing presence/absence of data blobs
|
/// Bitfield representing presence/absence of data blobs
|
||||||
data: u64,
|
data: u64,
|
||||||
/// Bitfield representing presence/absence of coding blobs
|
/// Bitfield representing presence/absence of coding blobs
|
||||||
|
@ -106,7 +106,7 @@ impl ErasureMeta {
|
||||||
let (data_missing, coding_missing) =
|
let (data_missing, coding_missing) =
|
||||||
(NUM_DATA - self.num_data(), NUM_CODING - self.num_coding());
|
(NUM_DATA - self.num_data(), NUM_CODING - self.num_coding());
|
||||||
if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
|
if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
|
||||||
assert!(self.size != 0);
|
//assert!(self.size != 0);
|
||||||
ErasureMetaStatus::CanRecover
|
ErasureMetaStatus::CanRecover
|
||||||
} else if data_missing == 0 {
|
} else if data_missing == 0 {
|
||||||
ErasureMetaStatus::DataFull
|
ErasureMetaStatus::DataFull
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
//! A stage to broadcast data from a leader node to validators
|
//! A stage to broadcast data from a leader node to validators
|
||||||
|
use self::broadcast_bad_blob_sizes::BroadcastBadBlobSizes;
|
||||||
use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun;
|
use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun;
|
||||||
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
|
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
|
||||||
use self::standard_broadcast_run::StandardBroadcastRun;
|
use self::standard_broadcast_run::StandardBroadcastRun;
|
||||||
|
@ -21,6 +22,7 @@ use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
mod broadcast_bad_blob_sizes;
|
||||||
mod broadcast_fake_blobs_run;
|
mod broadcast_fake_blobs_run;
|
||||||
mod broadcast_utils;
|
mod broadcast_utils;
|
||||||
mod fail_entry_verification_broadcast_run;
|
mod fail_entry_verification_broadcast_run;
|
||||||
|
@ -38,6 +40,7 @@ pub enum BroadcastStageType {
|
||||||
Standard,
|
Standard,
|
||||||
FailEntryVerification,
|
FailEntryVerification,
|
||||||
BroadcastFakeBlobs,
|
BroadcastFakeBlobs,
|
||||||
|
BroadcastBadBlobSizes,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BroadcastStageType {
|
impl BroadcastStageType {
|
||||||
|
@ -76,6 +79,15 @@ impl BroadcastStageType {
|
||||||
blocktree,
|
blocktree,
|
||||||
BroadcastFakeBlobsRun::new(0),
|
BroadcastFakeBlobsRun::new(0),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
BroadcastStageType::BroadcastBadBlobSizes => BroadcastStage::new(
|
||||||
|
sock,
|
||||||
|
cluster_info,
|
||||||
|
receiver,
|
||||||
|
exit_sender,
|
||||||
|
blocktree,
|
||||||
|
BroadcastBadBlobSizes::new(),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
use super::*;
|
||||||
|
use crate::packet::BLOB_HEADER_SIZE;
|
||||||
|
use solana_sdk::hash::Hash;
|
||||||
|
use solana_sdk::signature::Signable;
|
||||||
|
|
||||||
|
pub(super) struct BroadcastBadBlobSizes {}
|
||||||
|
|
||||||
|
impl BroadcastBadBlobSizes {
|
||||||
|
pub(super) fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BroadcastRun for BroadcastBadBlobSizes {
|
||||||
|
fn run(
|
||||||
|
&mut self,
|
||||||
|
broadcast: &mut Broadcast,
|
||||||
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
|
receiver: &Receiver<WorkingBankEntries>,
|
||||||
|
sock: &UdpSocket,
|
||||||
|
blocktree: &Arc<Blocktree>,
|
||||||
|
) -> Result<()> {
|
||||||
|
// 1) Pull entries from banking stage
|
||||||
|
let mut receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
|
||||||
|
let bank = receive_results.bank.clone();
|
||||||
|
let last_tick = receive_results.last_tick;
|
||||||
|
|
||||||
|
// 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry
|
||||||
|
// in the slot to make verification fail on validators
|
||||||
|
if last_tick == bank.max_tick_height() {
|
||||||
|
let mut last_entry = receive_results
|
||||||
|
.ventries
|
||||||
|
.last_mut()
|
||||||
|
.unwrap()
|
||||||
|
.last_mut()
|
||||||
|
.unwrap();
|
||||||
|
last_entry.0.hash = Hash::default();
|
||||||
|
}
|
||||||
|
|
||||||
|
let keypair = &cluster_info.read().unwrap().keypair.clone();
|
||||||
|
let latest_blob_index = blocktree
|
||||||
|
.meta(bank.slot())
|
||||||
|
.expect("Database error")
|
||||||
|
.map(|meta| meta.consumed)
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
|
||||||
|
receive_results.ventries,
|
||||||
|
&broadcast.thread_pool,
|
||||||
|
latest_blob_index,
|
||||||
|
last_tick,
|
||||||
|
&bank,
|
||||||
|
&keypair,
|
||||||
|
&mut broadcast.coding_generator,
|
||||||
|
);
|
||||||
|
|
||||||
|
for b in data_blobs.iter().chain(coding_blobs.iter()) {
|
||||||
|
let mut w_b = b.write().unwrap();
|
||||||
|
let real_size = w_b.meta.size;
|
||||||
|
// corrupt the size in the header
|
||||||
|
w_b.set_size(std::usize::MAX - BLOB_HEADER_SIZE);
|
||||||
|
// resign the blob
|
||||||
|
w_b.sign(&keypair);
|
||||||
|
// don't corrupt the size in the meta so that broadcast will still work
|
||||||
|
w_b.meta.size = real_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
|
||||||
|
|
||||||
|
// 3) Start broadcast step
|
||||||
|
let bank_epoch = bank.get_stakers_epoch(bank.slot());
|
||||||
|
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
|
||||||
|
|
||||||
|
// Broadcast data + erasures
|
||||||
|
cluster_info.read().unwrap().broadcast(
|
||||||
|
sock,
|
||||||
|
data_blobs.iter().chain(coding_blobs.iter()),
|
||||||
|
stakes.as_ref(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -414,7 +414,7 @@ impl Blob {
|
||||||
|
|
||||||
let bytes = &data[..data_len];
|
let bytes = &data[..data_len];
|
||||||
blob.data[..data_len].copy_from_slice(bytes);
|
blob.data[..data_len].copy_from_slice(bytes);
|
||||||
blob.meta.size = blob.data_size() as usize;
|
blob.meta.size = data_len;
|
||||||
blob
|
blob
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -504,7 +504,10 @@ impl Blob {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn data_size(&self) -> u64 {
|
pub fn data_size(&self) -> u64 {
|
||||||
LittleEndian::read_u64(&self.data[SIZE_RANGE])
|
cmp::min(
|
||||||
|
LittleEndian::read_u64(&self.data[SIZE_RANGE]),
|
||||||
|
BLOB_SIZE as u64,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_data_size(&mut self, size: u64) {
|
pub fn set_data_size(&mut self, size: u64) {
|
||||||
|
|
|
@ -237,11 +237,27 @@ fn test_listener_startup() {
|
||||||
#[test]
|
#[test]
|
||||||
#[serial]
|
#[serial]
|
||||||
fn test_fail_entry_verification_leader() {
|
fn test_fail_entry_verification_leader() {
|
||||||
|
test_faulty_node(BroadcastStageType::FailEntryVerification);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_bad_blob_size_leader() {
|
||||||
|
test_faulty_node(BroadcastStageType::BroadcastBadBlobSizes);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
fn test_fake_blobs_broadcast_leader() {
|
||||||
|
test_faulty_node(BroadcastStageType::BroadcastFakeBlobs);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_faulty_node(faulty_node_type: BroadcastStageType) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let num_nodes = 4;
|
let num_nodes = 4;
|
||||||
let validator_config = ValidatorConfig::default();
|
let validator_config = ValidatorConfig::default();
|
||||||
let mut error_validator_config = ValidatorConfig::default();
|
let mut error_validator_config = ValidatorConfig::default();
|
||||||
error_validator_config.broadcast_stage_type = BroadcastStageType::FailEntryVerification;
|
error_validator_config.broadcast_stage_type = faulty_node_type.clone();
|
||||||
let mut validator_configs = vec![validator_config; num_nodes - 1];
|
let mut validator_configs = vec![validator_config; num_nodes - 1];
|
||||||
validator_configs.push(error_validator_config);
|
validator_configs.push(error_validator_config);
|
||||||
let mut node_stakes = vec![100; num_nodes - 1];
|
let mut node_stakes = vec![100; num_nodes - 1];
|
||||||
|
@ -274,61 +290,7 @@ fn test_fail_entry_verification_leader() {
|
||||||
let corrupt_node = cluster
|
let corrupt_node = cluster
|
||||||
.fullnode_infos
|
.fullnode_infos
|
||||||
.iter()
|
.iter()
|
||||||
.find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::FailEntryVerification)
|
.find(|(_, v)| v.config.broadcast_stage_type == faulty_node_type)
|
||||||
.unwrap()
|
|
||||||
.0;
|
|
||||||
let mut ignore = HashSet::new();
|
|
||||||
ignore.insert(*corrupt_node);
|
|
||||||
|
|
||||||
// Verify that we can still spend and verify even in the presence of corrupt nodes
|
|
||||||
cluster_tests::spend_and_verify_all_nodes(
|
|
||||||
&cluster.entry_point_info,
|
|
||||||
&cluster.funding_keypair,
|
|
||||||
num_nodes,
|
|
||||||
ignore,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[ignore]
|
|
||||||
fn test_fake_blobs_broadcast_leader() {
|
|
||||||
solana_logger::setup();
|
|
||||||
let num_nodes = 4;
|
|
||||||
let validator_config = ValidatorConfig::default();
|
|
||||||
let mut error_validator_config = ValidatorConfig::default();
|
|
||||||
error_validator_config.broadcast_stage_type = BroadcastStageType::BroadcastFakeBlobs;
|
|
||||||
let mut validator_configs = vec![validator_config; num_nodes - 1];
|
|
||||||
validator_configs.push(error_validator_config);
|
|
||||||
|
|
||||||
let cluster_config = ClusterConfig {
|
|
||||||
cluster_lamports: 10_000,
|
|
||||||
node_stakes: vec![100; 4],
|
|
||||||
validator_configs: validator_configs,
|
|
||||||
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2 as u64,
|
|
||||||
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2 as u64,
|
|
||||||
..ClusterConfig::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let cluster = LocalCluster::new(&cluster_config);
|
|
||||||
let epoch_schedule = EpochSchedule::new(
|
|
||||||
cluster_config.slots_per_epoch,
|
|
||||||
cluster_config.stakers_slot_offset,
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1;
|
|
||||||
|
|
||||||
// Wait for the corrupted leader to be scheduled afer the warmup epochs expire
|
|
||||||
cluster_tests::sleep_n_epochs(
|
|
||||||
(num_warmup_epochs + 1) as f64,
|
|
||||||
&cluster.genesis_block.poh_config,
|
|
||||||
cluster_config.ticks_per_slot,
|
|
||||||
cluster_config.slots_per_epoch,
|
|
||||||
);
|
|
||||||
|
|
||||||
let corrupt_node = cluster
|
|
||||||
.fullnode_infos
|
|
||||||
.iter()
|
|
||||||
.find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::BroadcastFakeBlobs)
|
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.0;
|
.0;
|
||||||
let mut ignore = HashSet::new();
|
let mut ignore = HashSet::new();
|
||||||
|
|
Loading…
Reference in New Issue