From 22ef3c7c547045a6009060d352ef69fc597c80b1 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 8 Jul 2019 18:21:52 -0700 Subject: [PATCH] Blob verify (#4951) * Ensure signable data is not out of range * Add a broadcast stage that puts bad sizes in blobs * Resign blob after modifyign size * Remove assertions that fail when size != meta.size --- core/src/banking_stage.rs | 2 +- core/src/blocktree.rs | 2 - core/src/blocktree/meta.rs | 4 +- core/src/broadcast_stage.rs | 12 +++ .../broadcast_bad_blob_sizes.rs | 83 +++++++++++++++++++ core/src/packet.rs | 7 +- core/tests/local_cluster.rs | 74 ++++------------- 7 files changed, 121 insertions(+), 63 deletions(-) create mode 100644 core/src/broadcast_stage/broadcast_bad_blob_sizes.rs diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 37d501a9f1..474d8038a3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -463,7 +463,7 @@ impl BankingStage { // we just attempted to record) as retryable return (res, processed_transactions_indexes); } - Err(_) => panic!("Poh recorder returned unexpected error"), + Err(e) => panic!(format!("Poh recorder returned unexpected error: {:?}", e)), } poh_record.stop(); } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 660a206e4f..e0984b72c8 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -1490,8 +1490,6 @@ fn recover( } } - assert_ne!(size, 0); - for i in start_idx..data_end_idx { let set_relative_idx = erasure_meta.data_index_in_set(i).unwrap() as usize; diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index b34576d141..599f04cdd9 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -78,7 +78,7 @@ pub struct ErasureMeta { /// Which erasure set in the slot this is pub set_index: u64, /// Size of shards in this erasure set - pub size: usize, + size: usize, /// Bitfield representing presence/absence of data blobs data: u64, /// Bitfield representing presence/absence of coding blobs @@ -106,7 +106,7 @@ impl ErasureMeta { let (data_missing, coding_missing) = (NUM_DATA - self.num_data(), NUM_CODING - self.num_coding()); if data_missing > 0 && data_missing + coding_missing <= NUM_CODING { - assert!(self.size != 0); + //assert!(self.size != 0); ErasureMetaStatus::CanRecover } else if data_missing == 0 { ErasureMetaStatus::DataFull diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index c69c6dec85..f558e52d67 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,4 +1,5 @@ //! A stage to broadcast data from a leader node to validators +use self::broadcast_bad_blob_sizes::BroadcastBadBlobSizes; use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun; use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun; use self::standard_broadcast_run::StandardBroadcastRun; @@ -21,6 +22,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; +mod broadcast_bad_blob_sizes; mod broadcast_fake_blobs_run; mod broadcast_utils; mod fail_entry_verification_broadcast_run; @@ -38,6 +40,7 @@ pub enum BroadcastStageType { Standard, FailEntryVerification, BroadcastFakeBlobs, + BroadcastBadBlobSizes, } impl BroadcastStageType { @@ -76,6 +79,15 @@ impl BroadcastStageType { blocktree, BroadcastFakeBlobsRun::new(0), ), + + BroadcastStageType::BroadcastBadBlobSizes => BroadcastStage::new( + sock, + cluster_info, + receiver, + exit_sender, + blocktree, + BroadcastBadBlobSizes::new(), + ), } } } diff --git a/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs b/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs new file mode 100644 index 0000000000..0bfeccc59f --- /dev/null +++ b/core/src/broadcast_stage/broadcast_bad_blob_sizes.rs @@ -0,0 +1,83 @@ +use super::*; +use crate::packet::BLOB_HEADER_SIZE; +use solana_sdk::hash::Hash; +use solana_sdk::signature::Signable; + +pub(super) struct BroadcastBadBlobSizes {} + +impl BroadcastBadBlobSizes { + pub(super) fn new() -> Self { + Self {} + } +} + +impl BroadcastRun for BroadcastBadBlobSizes { + fn run( + &mut self, + broadcast: &mut Broadcast, + cluster_info: &Arc>, + receiver: &Receiver, + sock: &UdpSocket, + blocktree: &Arc, + ) -> Result<()> { + // 1) Pull entries from banking stage + let mut receive_results = broadcast_utils::recv_slot_blobs(receiver)?; + let bank = receive_results.bank.clone(); + let last_tick = receive_results.last_tick; + + // 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry + // in the slot to make verification fail on validators + if last_tick == bank.max_tick_height() { + let mut last_entry = receive_results + .ventries + .last_mut() + .unwrap() + .last_mut() + .unwrap(); + last_entry.0.hash = Hash::default(); + } + + let keypair = &cluster_info.read().unwrap().keypair.clone(); + let latest_blob_index = blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); + + let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs( + receive_results.ventries, + &broadcast.thread_pool, + latest_blob_index, + last_tick, + &bank, + &keypair, + &mut broadcast.coding_generator, + ); + + for b in data_blobs.iter().chain(coding_blobs.iter()) { + let mut w_b = b.write().unwrap(); + let real_size = w_b.meta.size; + // corrupt the size in the header + w_b.set_size(std::usize::MAX - BLOB_HEADER_SIZE); + // resign the blob + w_b.sign(&keypair); + // don't corrupt the size in the meta so that broadcast will still work + w_b.meta.size = real_size; + } + + blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; + + // 3) Start broadcast step + let bank_epoch = bank.get_stakers_epoch(bank.slot()); + let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + + // Broadcast data + erasures + cluster_info.read().unwrap().broadcast( + sock, + data_blobs.iter().chain(coding_blobs.iter()), + stakes.as_ref(), + )?; + + Ok(()) + } +} diff --git a/core/src/packet.rs b/core/src/packet.rs index e69cdae4bd..ffd70df022 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -414,7 +414,7 @@ impl Blob { let bytes = &data[..data_len]; blob.data[..data_len].copy_from_slice(bytes); - blob.meta.size = blob.data_size() as usize; + blob.meta.size = data_len; blob } @@ -504,7 +504,10 @@ impl Blob { } pub fn data_size(&self) -> u64 { - LittleEndian::read_u64(&self.data[SIZE_RANGE]) + cmp::min( + LittleEndian::read_u64(&self.data[SIZE_RANGE]), + BLOB_SIZE as u64, + ) } pub fn set_data_size(&mut self, size: u64) { diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index bb478d21cb..f87f2ae68d 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -237,11 +237,27 @@ fn test_listener_startup() { #[test] #[serial] fn test_fail_entry_verification_leader() { + test_faulty_node(BroadcastStageType::FailEntryVerification); +} + +#[test] +#[serial] +fn test_bad_blob_size_leader() { + test_faulty_node(BroadcastStageType::BroadcastBadBlobSizes); +} + +#[test] +#[ignore] +fn test_fake_blobs_broadcast_leader() { + test_faulty_node(BroadcastStageType::BroadcastFakeBlobs); +} + +fn test_faulty_node(faulty_node_type: BroadcastStageType) { solana_logger::setup(); let num_nodes = 4; let validator_config = ValidatorConfig::default(); let mut error_validator_config = ValidatorConfig::default(); - error_validator_config.broadcast_stage_type = BroadcastStageType::FailEntryVerification; + error_validator_config.broadcast_stage_type = faulty_node_type.clone(); let mut validator_configs = vec![validator_config; num_nodes - 1]; validator_configs.push(error_validator_config); let mut node_stakes = vec![100; num_nodes - 1]; @@ -274,61 +290,7 @@ fn test_fail_entry_verification_leader() { let corrupt_node = cluster .fullnode_infos .iter() - .find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::FailEntryVerification) - .unwrap() - .0; - let mut ignore = HashSet::new(); - ignore.insert(*corrupt_node); - - // Verify that we can still spend and verify even in the presence of corrupt nodes - cluster_tests::spend_and_verify_all_nodes( - &cluster.entry_point_info, - &cluster.funding_keypair, - num_nodes, - ignore, - ); -} - -#[test] -#[ignore] -fn test_fake_blobs_broadcast_leader() { - solana_logger::setup(); - let num_nodes = 4; - let validator_config = ValidatorConfig::default(); - let mut error_validator_config = ValidatorConfig::default(); - error_validator_config.broadcast_stage_type = BroadcastStageType::BroadcastFakeBlobs; - let mut validator_configs = vec![validator_config; num_nodes - 1]; - validator_configs.push(error_validator_config); - - let cluster_config = ClusterConfig { - cluster_lamports: 10_000, - node_stakes: vec![100; 4], - validator_configs: validator_configs, - slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2 as u64, - stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2 as u64, - ..ClusterConfig::default() - }; - - let cluster = LocalCluster::new(&cluster_config); - let epoch_schedule = EpochSchedule::new( - cluster_config.slots_per_epoch, - cluster_config.stakers_slot_offset, - true, - ); - let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1; - - // Wait for the corrupted leader to be scheduled afer the warmup epochs expire - cluster_tests::sleep_n_epochs( - (num_warmup_epochs + 1) as f64, - &cluster.genesis_block.poh_config, - cluster_config.ticks_per_slot, - cluster_config.slots_per_epoch, - ); - - let corrupt_node = cluster - .fullnode_infos - .iter() - .find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::BroadcastFakeBlobs) + .find(|(_, v)| v.config.broadcast_stage_type == faulty_node_type) .unwrap() .0; let mut ignore = HashSet::new();