Add local cluster tests that broadcast duplicate slots (#13995)

* Add duplicate node local cluster test

* fix clippy

* remove dupe test
This commit is contained in:
Justin Starry 2021-06-09 15:01:48 -07:00 committed by GitHub
parent 84f121881e
commit 050bb5446d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 397 additions and 18 deletions

View File

@ -1,6 +1,7 @@
//! A stage to broadcast data from a leader node to validators
#![allow(clippy::rc_buffer)]
use self::{
broadcast_duplicates_run::BroadcastDuplicatesRun,
broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*,
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
@ -35,6 +36,7 @@ use std::{
time::{Duration, Instant},
};
mod broadcast_duplicates_run;
mod broadcast_fake_shreds_run;
pub mod broadcast_metrics;
pub(crate) mod broadcast_utils;
@ -52,11 +54,20 @@ pub enum BroadcastStageReturnType {
ChannelDisconnected,
}
#[derive(PartialEq, Clone, Debug)]
pub struct BroadcastDuplicatesConfig {
/// Percentage of stake to send different version of slots to
pub stake_partition: u8,
/// Number of slots to wait before sending duplicate shreds
pub duplicate_send_delay: usize,
}
#[derive(PartialEq, Clone, Debug)]
pub enum BroadcastStageType {
Standard,
FailEntryVerification,
BroadcastFakeShreds,
BroadcastDuplicates(BroadcastDuplicatesConfig),
}
impl BroadcastStageType {
@ -101,6 +112,16 @@ impl BroadcastStageType {
blockstore,
BroadcastFakeShredsRun::new(keypair, 0, shred_version),
),
BroadcastStageType::BroadcastDuplicates(config) => BroadcastStage::new(
sock,
cluster_info,
receiver,
retransmit_slots_receiver,
exit_sender,
blockstore,
BroadcastDuplicatesRun::new(keypair, shred_version, config.clone()),
),
}
}
}

View File

@ -0,0 +1,333 @@
use super::broadcast_utils::ReceiveResults;
use super::*;
use log::*;
use solana_ledger::entry::{create_ticks, Entry, EntrySlice};
use solana_ledger::shred::Shredder;
use solana_runtime::blockhash_queue::BlockhashQueue;
use solana_sdk::clock::Slot;
use solana_sdk::fee_calculator::FeeCalculator;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::Mutex;
// Queue which facilitates delivering shreds with a delay
type DelayedQueue = VecDeque<(Option<Pubkey>, Option<Vec<Shred>>)>;
#[derive(Clone)]
pub(super) struct BroadcastDuplicatesRun {
config: BroadcastDuplicatesConfig,
// Local queue for broadcast to track which duplicate blockhashes we've sent
duplicate_queue: BlockhashQueue,
// Shared queue between broadcast and transmit threads
delayed_queue: Arc<Mutex<DelayedQueue>>,
// Buffer for duplicate entries
duplicate_entries_buffer: Vec<Entry>,
last_duplicate_entry_hash: Hash,
last_broadcast_slot: Slot,
next_shred_index: u32,
shred_version: u16,
keypair: Arc<Keypair>,
}
impl BroadcastDuplicatesRun {
pub(super) fn new(
keypair: Arc<Keypair>,
shred_version: u16,
config: BroadcastDuplicatesConfig,
) -> Self {
let mut delayed_queue = DelayedQueue::new();
delayed_queue.resize(config.duplicate_send_delay, (None, None));
Self {
config,
delayed_queue: Arc::new(Mutex::new(delayed_queue)),
duplicate_queue: BlockhashQueue::default(),
duplicate_entries_buffer: vec![],
next_shred_index: u32::MAX,
last_broadcast_slot: 0,
last_duplicate_entry_hash: Hash::default(),
shred_version,
keypair,
}
}
fn queue_or_create_duplicate_entries(
&mut self,
bank: &Arc<Bank>,
receive_results: &ReceiveResults,
) -> (Vec<Entry>, u32) {
// If the last entry hash is default, grab the last blockhash from the parent bank
if self.last_duplicate_entry_hash == Hash::default() {
self.last_duplicate_entry_hash = bank.last_blockhash();
}
// Create duplicate entries by..
// 1) rearranging real entries so that all transaction entries are moved to
// the front and tick entries are moved to the back.
// 2) setting all transaction entries to zero hashes and all tick entries to `hashes_per_tick`.
// 3) removing any transactions which reference blockhashes which aren't in the
// duplicate blockhash queue.
let (duplicate_entries, next_shred_index) = if bank.slot() > MINIMUM_DUPLICATE_SLOT {
let mut tx_entries: Vec<Entry> = receive_results
.entries
.iter()
.filter_map(|entry| {
if entry.is_tick() {
return None;
}
let transactions: Vec<Transaction> = entry
.transactions
.iter()
.filter(|tx| {
self.duplicate_queue
.get_hash_age(&tx.message.recent_blockhash)
.is_some()
})
.cloned()
.collect();
if !transactions.is_empty() {
Some(Entry::new_mut(
&mut self.last_duplicate_entry_hash,
&mut 0,
transactions,
))
} else {
None
}
})
.collect();
let mut tick_entries = create_ticks(
receive_results.entries.tick_count(),
bank.hashes_per_tick().unwrap_or_default(),
self.last_duplicate_entry_hash,
);
self.duplicate_entries_buffer.append(&mut tx_entries);
self.duplicate_entries_buffer.append(&mut tick_entries);
// Only send out duplicate entries when the block is finished otherwise the
// recipient will start repairing for shreds they haven't received yet and
// hit duplicate slot issues before we want them to.
let entries = if receive_results.last_tick_height == bank.max_tick_height() {
self.duplicate_entries_buffer.drain(..).collect()
} else {
vec![]
};
// Set next shred index to 0 since we are sending the full slot
(entries, 0)
} else {
// Send real entries until we hit min duplicate slot
(receive_results.entries.clone(), self.next_shred_index)
};
// Save last duplicate entry hash to avoid invalid entry hash errors
if let Some(last_duplicate_entry) = duplicate_entries.last() {
self.last_duplicate_entry_hash = last_duplicate_entry.hash;
}
(duplicate_entries, next_shred_index)
}
}
/// Duplicate slots should only be sent once all validators have started.
/// This constant is intended to be used as a buffer so that all validators
/// are live before sending duplicate slots.
pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
impl BroadcastRun for BroadcastDuplicatesRun {
fn run(
&mut self,
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
if self.next_shred_index == u32::MAX {
self.next_shred_index = blockstore
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0) as u32
}
// We were not the leader, but just became leader again
if bank.slot() > self.last_broadcast_slot + 1 {
self.last_duplicate_entry_hash = Hash::default();
}
self.last_broadcast_slot = bank.slot();
let shredder = Shredder::new(
bank.slot(),
bank.parent().unwrap().slot(),
self.keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds(
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
self.next_shred_index,
);
let (duplicate_entries, next_duplicate_shred_index) =
self.queue_or_create_duplicate_entries(&bank, &receive_results);
let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() {
shredder.entries_to_shreds(
&duplicate_entries,
last_tick_height == bank.max_tick_height(),
next_duplicate_shred_index,
)
} else {
(vec![], vec![], 0)
};
// Manually track the shred index because relying on slot meta consumed is racy
if last_tick_height == bank.max_tick_height() {
self.next_shred_index = 0;
self.duplicate_queue
.register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default());
} else {
self.next_shred_index = last_shred_index;
}
// Partition network with duplicate and real shreds based on stake
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let mut duplicate_recipients = HashMap::new();
let mut real_recipients = HashMap::new();
let mut stakes: Vec<(Pubkey, u64)> = bank
.epoch_staked_nodes(bank_epoch)
.unwrap()
.into_iter()
.filter(|(pubkey, _)| *pubkey != self.keypair.pubkey())
.collect();
stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| {
if r_stake == l_stake {
l_key.cmp(&r_key)
} else {
r_stake.cmp(&l_stake)
}
});
let highest_staked_node = stakes.first().cloned().map(|x| x.0);
let stake_total: u64 = stakes.iter().map(|(_, stake)| *stake).sum();
let mut cumulative_stake: u64 = 0;
for (pubkey, stake) in stakes.into_iter().rev() {
cumulative_stake += stake;
if (100 * cumulative_stake / stake_total) as u8 <= self.config.stake_partition {
duplicate_recipients.insert(pubkey, stake);
} else {
real_recipients.insert(pubkey, stake);
}
}
if let Some(highest_staked_node) = highest_staked_node {
if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() {
warn!(
"{} sent duplicate slot {} to nodes: {:?}",
self.keypair.pubkey(),
bank.slot(),
&duplicate_recipients,
);
warn!(
"Duplicate shreds for slot {} will be broadcast in {} slot(s)",
bank.slot(),
self.config.duplicate_send_delay
);
let delayed_shreds: Option<Vec<Shred>> = vec![
duplicate_data_shreds.last().cloned(),
data_shreds.last().cloned(),
]
.into_iter()
.collect();
self.delayed_queue
.lock()
.unwrap()
.push_back((Some(highest_staked_node), delayed_shreds));
}
}
let duplicate_recipients = Arc::new(duplicate_recipients);
let real_recipients = Arc::new(real_recipients);
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step
socket_sender.send((
(
Some(duplicate_recipients.clone()),
Arc::new(duplicate_data_shreds),
),
None,
))?;
socket_sender.send((
(
Some(duplicate_recipients),
Arc::new(duplicate_coding_shreds),
),
None,
))?;
socket_sender.send(((Some(real_recipients.clone()), data_shreds), None))?;
socket_sender.send(((Some(real_recipients), Arc::new(coding_shreds)), None))?;
Ok(())
}
fn transmit(
&mut self,
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
) -> Result<()> {
// Check the delay queue for shreds that are ready to be sent
let (delayed_recipient, delayed_shreds) = {
let mut delayed_deque = self.delayed_queue.lock().unwrap();
if delayed_deque.len() > self.config.duplicate_send_delay {
delayed_deque.pop_front().unwrap()
} else {
(None, None)
}
};
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
let stakes = stakes.unwrap();
for peer in cluster_info.tvu_peers() {
// Forward shreds to circumvent gossip
if stakes.get(&peer.id).is_some() {
shreds.iter().for_each(|shred| {
sock.send_to(&shred.payload, &peer.tvu_forwards).unwrap();
});
}
// After a delay, broadcast duplicate shreds to a single node
if let Some(shreds) = delayed_shreds.as_ref() {
if Some(peer.id) == delayed_recipient {
shreds.iter().for_each(|shred| {
sock.send_to(&shred.payload, &peer.tvu).unwrap();
});
}
}
}
Ok(())
}
fn record(
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let (data_shreds, _) = receiver.lock().unwrap().recv()?;
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
Ok(())
}
}

View File

@ -280,18 +280,23 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo],
let mut last_print = Instant::now();
let loop_start = Instant::now();
let loop_timeout = Duration::from_secs(60);
let mut num_roots_map = HashMap::new();
while !done {
assert!(loop_start.elapsed() < loop_timeout);
for (i, ingress_node) in contact_infos.iter().enumerate() {
let client = create_client(ingress_node.client_facing_addr(), VALIDATOR_PORT_RANGE);
let slot = client.get_slot().unwrap_or(0);
roots[i].insert(slot);
let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0);
done = min_node >= num_new_roots;
let root_slot = client
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
roots[i].insert(root_slot);
num_roots_map.insert(ingress_node.id, roots[i].len());
let num_roots = roots.iter().map(|r| r.len()).min().unwrap();
done = num_roots >= num_new_roots;
if done || last_print.elapsed().as_secs() > 3 {
info!(
"{} {} min observed roots {}/16",
test_name, ingress_node.id, min_node
"{} waiting for {} new roots.. observed: {:?}",
test_name, num_new_roots, num_roots_map
);
last_print = Instant::now();
}

View File

@ -12,7 +12,7 @@ use solana_client::{
thin_client::{create_client, ThinClient},
};
use solana_core::{
broadcast_stage::BroadcastStageType,
broadcast_stage::{BroadcastDuplicatesConfig, BroadcastStageType},
consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH},
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
validator::ValidatorConfig,
@ -1967,32 +1967,52 @@ fn test_fail_entry_verification_leader() {
}
#[test]
#[allow(unused_attributes)]
#[serial]
#[ignore]
#[allow(unused_attributes)]
fn test_fake_shreds_broadcast_leader() {
test_faulty_node(BroadcastStageType::BroadcastFakeShreds);
}
#[test]
#[serial]
#[ignore]
#[allow(unused_attributes)]
fn test_duplicate_shreds_broadcast_leader() {
test_faulty_node(BroadcastStageType::BroadcastDuplicates(
BroadcastDuplicatesConfig {
stake_partition: 50,
duplicate_send_delay: 1,
},
));
}
fn test_faulty_node(faulty_node_type: BroadcastStageType) {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let num_nodes = 2;
solana_logger::setup_with_default("solana_local_cluster=info");
let num_nodes = 3;
let error_validator_config = ValidatorConfig {
broadcast_stage_type: faulty_node_type,
..ValidatorConfig::default()
};
let mut validator_configs = Vec::with_capacity(num_nodes - 1);
let mut validator_configs = Vec::with_capacity(num_nodes);
validator_configs.resize_with(num_nodes - 1, ValidatorConfig::default);
validator_configs.push(error_validator_config);
// Push a faulty_bootstrap = vec![error_validator_config];
validator_configs.insert(0, error_validator_config);
let node_stakes = vec![300, 100];
let mut validator_keys = Vec::with_capacity(num_nodes);
validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true));
let node_stakes = vec![60, 50, 60];
assert_eq!(node_stakes.len(), num_nodes);
assert_eq!(validator_keys.len(), num_nodes);
let mut cluster_config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes,
validator_configs,
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2,
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2,
validator_keys: Some(validator_keys),
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2u64,
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2u64,
..ClusterConfig::default()
};

View File

@ -127,7 +127,7 @@ impl BlockhashQueue {
.map(|(k, v)| recent_blockhashes::IterItem(v.hash_height, k, &v.fee_calculator))
}
pub fn len(&self) -> usize {
pub(crate) fn len(&self) -> usize {
self.max_age
}
}

View File

@ -12,7 +12,7 @@ pub mod bank;
pub mod bank_client;
pub mod bank_forks;
pub mod bank_utils;
mod blockhash_queue;
pub mod blockhash_queue;
pub mod bloom;
pub mod builtins;
pub mod commitment;