Broadcast run for injecting fake blobs in turbine (#4889)

* Broadcast run for injecting fake blobs in turbine

* address review comments

* new local cluster test that uses fake blob broadcast

* added a test to make sure tvu_peers ordering is guaranteed
This commit is contained in:
Pankaj Garg 2019-07-01 17:54:03 -07:00 committed by GitHub
parent 091999a17e
commit 3615445a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 232 additions and 0 deletions

View File

@ -1,4 +1,5 @@
//! A stage to broadcast data from a leader node to validators
use self::broadcast_fake_blobs_run::BroadcastFakeBlobsRun;
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree;
@ -20,6 +21,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
mod broadcast_fake_blobs_run;
mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
@ -35,6 +37,7 @@ pub enum BroadcastStageReturnType {
pub enum BroadcastStageType {
Standard,
FailEntryVerification,
BroadcastFakeBlobs,
}
impl BroadcastStageType {
@ -64,6 +67,15 @@ impl BroadcastStageType {
blocktree,
FailEntryVerificationBroadcastRun::new(),
),
BroadcastStageType::BroadcastFakeBlobs => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
BroadcastFakeBlobsRun::new(0),
),
}
}
}

View File

@ -0,0 +1,166 @@
use super::*;
use crate::entry::Entry;
use solana_sdk::hash::Hash;
pub(super) struct BroadcastFakeBlobsRun {
last_blockhash: Hash,
partition: usize,
}
impl BroadcastFakeBlobsRun {
pub(super) fn new(partition: usize) -> Self {
Self {
last_blockhash: Hash::default(),
partition,
}
}
}
impl BroadcastRun for BroadcastFakeBlobsRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
receive_results.ventries,
&broadcast.thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
);
// If the last blockhash is default, a new block is being created
// So grab the last blockhash from the parent bank
if self.last_blockhash == Hash::default() {
self.last_blockhash = bank.parent().unwrap().last_blockhash();
}
let fake_ventries: Vec<_> = (0..receive_results.num_entries)
.map(|_| vec![(Entry::new(&self.last_blockhash, 0, vec![]), 0)])
.collect();
let (fake_data_blobs, fake_coding_blobs) = broadcast_utils::entries_to_blobs(
fake_ventries,
&broadcast.thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
);
// If it's the last tick, reset the last block hash to default
// this will cause next run to grab last bank's blockhash
if last_tick == bank.max_tick_height() {
self.last_blockhash = Hash::default();
}
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
// Set the forwarded flag to true, so that the blobs won't be forwarded to peers
data_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
coding_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
fake_data_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
fake_coding_blobs
.iter()
.for_each(|blob| blob.write().unwrap().set_forwarded(true));
// 3) Start broadcast step
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition {
// Send fake blobs to the first N peers
fake_data_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
});
fake_coding_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
});
} else {
data_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
});
coding_blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
sock.send_to(&blob.data[..blob.meta.size], &peer.tvu)
.unwrap();
});
}
});
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[test]
fn test_tvu_peers_ordering() {
let mut cluster = ClusterInfo::new_with_invalid_keypair(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
8080,
)));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)),
8080,
)));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)),
8080,
)));
cluster.insert_info(ContactInfo::new_with_socketaddr(&SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 4)),
8080,
)));
let tvu_peers1 = cluster.tvu_peers();
(0..5).for_each(|_| {
cluster
.tvu_peers()
.iter()
.zip(tvu_peers1.iter())
.for_each(|(v1, v2)| {
assert_eq!(v1, v2);
});
});
}
}

View File

@ -274,6 +274,60 @@ fn test_fail_entry_verification_leader() {
);
}
#[test]
#[ignore]
fn test_fake_blobs_broadcast_leader() {
solana_logger::setup();
let num_nodes = 4;
let validator_config = ValidatorConfig::default();
let mut error_validator_config = ValidatorConfig::default();
error_validator_config.broadcast_stage_type = BroadcastStageType::BroadcastFakeBlobs;
let mut validator_configs = vec![validator_config; num_nodes - 1];
validator_configs.push(error_validator_config);
let cluster_config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; 4],
validator_configs: validator_configs,
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2 as u64,
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2 as u64,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&cluster_config);
let epoch_schedule = EpochSchedule::new(
cluster_config.slots_per_epoch,
cluster_config.stakers_slot_offset,
true,
);
let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1;
// Wait for the corrupted leader to be scheduled afer the warmup epochs expire
cluster_tests::sleep_n_epochs(
(num_warmup_epochs + 1) as f64,
&cluster.genesis_block.poh_config,
cluster_config.ticks_per_slot,
cluster_config.slots_per_epoch,
);
let corrupt_node = cluster
.fullnode_infos
.iter()
.find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::BroadcastFakeBlobs)
.unwrap()
.0;
let mut ignore = HashSet::new();
ignore.insert(*corrupt_node);
// Verify that we can still spend and verify even in the presence of corrupt nodes
cluster_tests::spend_and_verify_all_nodes(
&cluster.entry_point_info,
&cluster.funding_keypair,
num_nodes,
ignore,
);
}
#[test]
fn test_repairman_catchup() {
run_repairman_catchup(3);