Add local cluster test for optimistic confirmation with malformed votes (#29822)

This commit is contained in:
carllin 2023-01-31 14:19:45 -06:00 committed by GitHub
parent eaf4e6f1a6
commit b345d97f67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 413 additions and 134 deletions

View File

@ -253,6 +253,20 @@ impl ValidatorConfig {
..Self::default()
}
}
pub fn enable_default_rpc_block_subscribe(&mut self) {
let pubsub_config = PubSubConfig {
enable_block_subscription: true,
..PubSubConfig::default()
};
let rpc_config = JsonRpcConfig {
enable_rpc_transaction_history: true,
..JsonRpcConfig::default_for_test()
};
self.pubsub_config = pubsub_config;
self.rpc_config = rpc_config;
}
}
// `ValidatorStartProgress` contains status information that is surfaced to the node operator over

View File

@ -10,13 +10,15 @@ use {
solana_core::consensus::VOTE_THRESHOLD_DEPTH,
solana_entry::entry::{Entry, EntrySlice},
solana_gossip::{
cluster_info,
crds_value::{self, CrdsData, CrdsValue},
cluster_info::{self, ClusterInfo},
crds::Cursor,
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel},
gossip_error::GossipError,
gossip_service::discover_cluster,
gossip_service::{self, discover_cluster, GossipService},
legacy_contact_info::LegacyContactInfo as ContactInfo,
},
solana_ledger::blockstore::Blockstore,
solana_runtime::vote_transaction::VoteTransaction,
solana_sdk::{
client::SyncClient,
clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
@ -29,16 +31,20 @@ use {
signature::{Keypair, Signature, Signer},
system_transaction,
timing::{duration_as_ms, timestamp},
transaction::Transaction,
transport::TransportError,
},
solana_streamer::socket::SocketAddrSpace,
solana_vote_program::vote_transaction,
std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
path::Path,
sync::{Arc, RwLock},
thread::sleep,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{sleep, JoinHandle},
time::{Duration, Instant},
},
};
@ -429,6 +435,102 @@ fn poll_all_nodes_for_signature(
Ok(())
}
pub struct GossipVoter {
pub gossip_service: GossipService,
pub tcp_listener: Option<TcpListener>,
pub cluster_info: Arc<ClusterInfo>,
pub t_voter: JoinHandle<()>,
pub exit: Arc<AtomicBool>,
}
impl GossipVoter {
pub fn close(self) {
self.exit.store(true, Ordering::Relaxed);
self.t_voter.join().unwrap();
self.gossip_service.join().unwrap();
}
}
/// Reads votes from gossip and runs them through `vote_filter` to filter votes that then
/// get passed to `generate_vote_tx` to create votes that are then pushed into gossip as if
/// sent by a node with identity `node_keypair`.
pub fn start_gossip_voter(
gossip_addr: &SocketAddr,
node_keypair: &Keypair,
vote_filter: impl Fn((CrdsValueLabel, Transaction)) -> Option<(VoteTransaction, Transaction)>
+ std::marker::Send
+ 'static,
mut process_vote_tx: impl FnMut(Slot, &Transaction, &VoteTransaction, &ClusterInfo)
+ std::marker::Send
+ 'static,
sleep_ms: u64,
) -> GossipVoter {
let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node(
// Need to use our validator's keypair to gossip EpochSlots and votes for our
// node later.
node_keypair.insecure_clone(),
Some(gossip_addr),
&exit,
None,
0,
false,
SocketAddrSpace::Unspecified,
);
let t_voter = {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
std::thread::spawn(move || {
let mut cursor = Cursor::default();
loop {
if exit.load(Ordering::Relaxed) {
return;
}
let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor);
let mut parsed_vote_iter: Vec<_> = labels
.into_iter()
.zip(votes.into_iter())
.filter_map(&vote_filter)
.collect();
parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| {
vote.last_voted_slot()
.unwrap()
.cmp(&vote2.last_voted_slot().unwrap())
});
for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", latest_vote_slot);
process_vote_tx(
latest_vote_slot,
leader_vote_tx,
parsed_vote,
&cluster_info,
)
}
// Give vote some time to propagate
sleep(Duration::from_millis(sleep_ms));
}
if parsed_vote_iter.is_empty() {
sleep(Duration::from_millis(sleep_ms));
}
}
})
};
GossipVoter {
gossip_service,
tcp_listener,
cluster_info,
t_voter,
exit,
}
}
fn get_and_verify_slot_entries(
blockstore: &Blockstore,
slot: Slot,

View File

@ -23,17 +23,21 @@ use {
},
solana_local_cluster::{
cluster::{Cluster, ClusterValidatorInfo},
cluster_tests,
cluster_tests::{self},
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::*,
},
solana_pubsub_client::pubsub_client::PubsubClient,
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{
config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
config::{
RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcProgramAccountsConfig,
RpcSignatureSubscribeConfig,
},
response::RpcSignatureResult,
},
solana_runtime::{
commitment::VOTE_THRESHOLD_SIZE,
hardened_unpack::open_genesis_config,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
@ -41,6 +45,7 @@ use {
snapshot_utils::{
self, create_accounts_run_and_snapshot_dirs, ArchiveFormat, SnapshotVersion,
},
vote_parser,
},
solana_sdk::{
account::AccountSharedData,
@ -56,7 +61,7 @@ use {
system_program, system_transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY,
solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction},
std::{
collections::{HashMap, HashSet},
fs,
@ -64,7 +69,7 @@ use {
iter,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
thread::{sleep, Builder, JoinHandle},
@ -2510,6 +2515,213 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
);
}
#[test]
#[serial]
fn test_rpc_block_subscribe() {
let total_stake = 100 * DEFAULT_NODE_STAKE;
let leader_stake = total_stake;
let node_stakes = vec![leader_stake];
let mut validator_config = ValidatorConfig::default_for_test();
validator_config.enable_default_rpc_block_subscribe();
let validator_keys = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.take(node_stakes.len())
.collect::<Vec<_>>();
let mut config = ClusterConfig {
cluster_lamports: total_stake,
node_stakes,
validator_configs: vec![validator_config],
validator_keys: Some(validator_keys),
skip_warmup_slots: true,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe(
&format!("ws://{}", &cluster.entry_point_info.rpc_pubsub.to_string()),
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig::confirmed()),
encoding: None,
transaction_details: None,
show_rewards: None,
max_supported_transaction_version: None,
}),
)
.unwrap();
let mut received_block = false;
let max_wait = 10_000;
let start = Instant::now();
while !received_block {
assert!(
start.elapsed() <= Duration::from_millis(max_wait),
"Went too long {max_wait} ms without receiving a confirmed block",
);
let responses: Vec<_> = receiver.try_iter().collect();
// Wait for a response
if !responses.is_empty() {
for response in responses {
assert!(response.value.err.is_none());
assert!(response.value.block.is_some());
if response.value.slot > 1 {
received_block = true;
}
}
}
sleep(Duration::from_millis(100));
}
// If we don't drop the cluster, the blocking web socket service
// won't return, and the `block_subscribe_client` won't shut down
drop(cluster);
block_subscribe_client.shutdown().unwrap();
}
#[test]
#[serial]
#[allow(unused_attributes)]
fn test_oc_bad_signatures() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
let total_stake = 100 * DEFAULT_NODE_STAKE;
let leader_stake = (total_stake as f64 * VOTE_THRESHOLD_SIZE) as u64;
let our_node_stake = total_stake - leader_stake;
let node_stakes = vec![leader_stake, our_node_stake];
let mut validator_config = ValidatorConfig {
require_tower: true,
..ValidatorConfig::default_for_test()
};
validator_config.enable_default_rpc_block_subscribe();
let validator_keys = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.take(node_stakes.len())
.collect::<Vec<_>>();
let our_id = validator_keys.last().unwrap().0.pubkey();
let mut config = ClusterConfig {
cluster_lamports: total_stake,
node_stakes,
validator_configs: make_identical_validator_configs(&validator_config, 2),
validator_keys: Some(validator_keys),
skip_warmup_slots: true,
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
// 2) Kill our node and start up a thread to simulate votes to control our voting behavior
let our_info = cluster.exit_node(&our_id);
let node_keypair = our_info.info.keypair;
let vote_keypair = our_info.info.voting_keypair;
info!(
"our node id: {}, vote id: {}",
node_keypair.pubkey(),
vote_keypair.pubkey()
);
// 3) Start up a spy to listen for and push votes to leader TPU
let (rpc, tpu) = cluster_tests::get_client_facing_addr(&cluster.entry_point_info);
let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone());
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();
let voter_thread_sleep_ms: usize = 100;
let num_votes_simulated = Arc::new(AtomicUsize::new(0));
let gossip_voter = cluster_tests::start_gossip_voter(
&cluster.entry_point_info.gossip,
&node_keypair,
|(_label, leader_vote_tx)| {
let vote = vote_parser::parse_vote_transaction(&leader_vote_tx)
.map(|(_, vote, ..)| vote)
.unwrap();
// Filter out empty votes
if !vote.is_empty() {
Some((vote, leader_vote_tx))
} else {
None
}
},
{
let node_keypair = node_keypair.insecure_clone();
let vote_keypair = vote_keypair.insecure_clone();
let num_votes_simulated = num_votes_simulated.clone();
move |vote_slot, leader_vote_tx, parsed_vote, _cluster_info| {
info!("received vote for {}", vote_slot);
let vote_hash = parsed_vote.hash();
info!(
"Simulating vote from our node on slot {}, hash {}",
vote_slot, vote_hash
);
// Add all recent vote slots on this fork to allow cluster to pass
// vote threshold checks in replay. Note this will instantly force a
// root by this validator.
let vote_slots: Vec<Slot> = vec![vote_slot];
let bad_authorized_signer_keypair = Keypair::new();
let mut vote_tx = vote_transaction::new_vote_transaction(
vote_slots,
vote_hash,
leader_vote_tx.message.recent_blockhash,
&node_keypair,
&vote_keypair,
// Make a bad signer
&bad_authorized_signer_keypair,
None,
);
client
.retry_transfer(&cluster_funding_keypair, &mut vote_tx, 5)
.unwrap();
num_votes_simulated.fetch_add(1, Ordering::Relaxed);
}
},
voter_thread_sleep_ms as u64,
);
let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe(
&format!("ws://{}", &cluster.entry_point_info.rpc_pubsub.to_string()),
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig::confirmed()),
encoding: None,
transaction_details: None,
show_rewards: None,
max_supported_transaction_version: None,
}),
)
.unwrap();
const MAX_VOTES_TO_SIMULATE: usize = 10;
// Make sure test doesn't take too long
assert!(voter_thread_sleep_ms * MAX_VOTES_TO_SIMULATE <= 1000);
loop {
let responses: Vec<_> = receiver.try_iter().collect();
// Nothing should get optimistically confirmed or rooted
assert!(responses.is_empty());
// Wait for the voter thread to attempt sufficient number of votes to give
// a chance for the violation to occur
if num_votes_simulated.load(Ordering::Relaxed) > MAX_VOTES_TO_SIMULATE {
break;
}
sleep(Duration::from_millis(100));
}
// Clean up voter thread
gossip_voter.close();
// If we don't drop the cluster, the blocking web socket service
// won't return, and the `block_subscribe_client` won't shut down
drop(cluster);
block_subscribe_client.shutdown().unwrap();
}
#[test]
#[serial]
#[ignore]

View File

@ -13,14 +13,11 @@ use {
replay_stage::DUPLICATE_THRESHOLD,
validator::ValidatorConfig,
},
solana_gossip::{
crds::Cursor,
gossip_service::{self, discover_cluster},
},
solana_gossip::gossip_service::discover_cluster,
solana_ledger::ancestor_iterator::AncestorIterator,
solana_local_cluster::{
cluster::{Cluster, ClusterValidatorInfo},
cluster_tests,
cluster_tests::{self},
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::*,
},
@ -37,10 +34,6 @@ use {
std::{
collections::{BTreeSet, HashSet},
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::sleep,
time::Duration,
},
@ -459,129 +452,89 @@ fn test_duplicate_shreds_broadcast_leader() {
let bad_leader_ledger_path = cluster.validators[&bad_leader_id].info.ledger_path.clone();
info!("our node id: {}", node_keypair.pubkey());
// 3) Start up a spy to listen for votes
let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, _tcp_listener, cluster_info) = gossip_service::make_gossip_node(
// Need to use our validator's keypair to gossip EpochSlots and votes for our
// node later.
node_keypair.insecure_clone(),
Some(&cluster.entry_point_info.gossip),
&exit,
None,
0,
false,
SocketAddrSpace::Unspecified,
);
let t_voter = {
let exit = exit.clone();
std::thread::spawn(move || {
let mut cursor = Cursor::default();
// 3) Start up a gossip instance to listen for and push votes
let voter_thread_sleep_ms = 100;
let gossip_voter = cluster_tests::start_gossip_voter(
&cluster.entry_point_info.gossip,
&node_keypair,
move |(label, leader_vote_tx)| {
// Filter out votes not from the bad leader
if label.pubkey() == bad_leader_id {
let vote = vote_parser::parse_vote_transaction(&leader_vote_tx)
.map(|(_, vote, ..)| vote)
.unwrap();
// Filter out empty votes
if !vote.is_empty() {
Some((vote, leader_vote_tx))
} else {
None
}
} else {
None
}
},
{
let node_keypair = node_keypair.insecure_clone();
let vote_keypair = vote_keypair.insecure_clone();
let mut max_vote_slot = 0;
let mut gossip_vote_index = 0;
loop {
if exit.load(Ordering::Relaxed) {
return;
move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| {
info!("received vote for {}", latest_vote_slot);
// Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot.
if latest_vote_slot > max_vote_slot {
let new_epoch_slots: Vec<Slot> =
(max_vote_slot + 1..latest_vote_slot + 1).collect();
info!(
"Simulating epoch slots from our node: {:?}",
new_epoch_slots
);
cluster_info.push_epoch_slots(&new_epoch_slots);
max_vote_slot = latest_vote_slot;
}
let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor);
let mut parsed_vote_iter: Vec<_> = labels
.into_iter()
.zip(votes.into_iter())
.filter_map(|(label, leader_vote_tx)| {
// Filter out votes not from the bad leader
if label.pubkey() == bad_leader_id {
let vote = vote_parser::parse_vote_transaction(&leader_vote_tx)
.map(|(_, vote, ..)| vote)
.unwrap();
// Filter out empty votes
if !vote.is_empty() {
Some((vote, leader_vote_tx))
} else {
None
}
} else {
None
}
})
.collect();
// Only vote on even slots. Note this may violate lockouts if the
// validator started voting on a different fork before we could exit
// it above.
let vote_hash = parsed_vote.hash();
if latest_vote_slot % 2 == 0 {
info!(
"Simulating vote from our node on slot {}, hash {}",
latest_vote_slot, vote_hash
);
parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| {
vote.last_voted_slot()
.unwrap()
.cmp(&vote2.last_voted_slot().unwrap())
});
for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", latest_vote_slot);
// Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot.
if latest_vote_slot > max_vote_slot {
let new_epoch_slots: Vec<Slot> =
(max_vote_slot + 1..latest_vote_slot + 1).collect();
info!(
"Simulating epoch slots from our node: {:?}",
new_epoch_slots
);
cluster_info.push_epoch_slots(&new_epoch_slots);
max_vote_slot = latest_vote_slot;
}
// Only vote on even slots. Note this may violate lockouts if the
// validator started voting on a different fork before we could exit
// it above.
let vote_hash = parsed_vote.hash();
if latest_vote_slot % 2 == 0 {
info!(
"Simulating vote from our node on slot {}, hash {}",
latest_vote_slot, vote_hash
);
// Add all recent vote slots on this fork to allow cluster to pass
// vote threshold checks in replay. Note this will instantly force a
// root by this validator, but we're not concerned with lockout violations
// by this validator so it's fine.
let leader_blockstore = open_blockstore(&bad_leader_ledger_path);
let mut vote_slots: Vec<(Slot, u32)> = AncestorIterator::new_inclusive(
latest_vote_slot,
&leader_blockstore,
)
// Add all recent vote slots on this fork to allow cluster to pass
// vote threshold checks in replay. Note this will instantly force a
// root by this validator, but we're not concerned with lockout violations
// by this validator so it's fine.
let leader_blockstore = open_blockstore(&bad_leader_ledger_path);
let mut vote_slots: Vec<(Slot, u32)> =
AncestorIterator::new_inclusive(latest_vote_slot, &leader_blockstore)
.take(MAX_LOCKOUT_HISTORY)
.zip(1..)
.collect();
vote_slots.reverse();
let mut vote = VoteStateUpdate::from(vote_slots);
let root = AncestorIterator::new_inclusive(
latest_vote_slot,
&leader_blockstore,
)
vote_slots.reverse();
let mut vote = VoteStateUpdate::from(vote_slots);
let root =
AncestorIterator::new_inclusive(latest_vote_slot, &leader_blockstore)
.nth(MAX_LOCKOUT_HISTORY);
vote.root = root;
vote.hash = vote_hash;
let vote_tx =
vote_transaction::new_compact_vote_state_update_transaction(
vote,
leader_vote_tx.message.recent_blockhash,
&node_keypair,
&vote_keypair,
&vote_keypair,
None,
);
gossip_vote_index += 1;
gossip_vote_index %= MAX_LOCKOUT_HISTORY;
cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8)
}
}
// Give vote some time to propagate
sleep(Duration::from_millis(100));
}
if parsed_vote_iter.is_empty() {
sleep(Duration::from_millis(100));
vote.root = root;
vote.hash = vote_hash;
let vote_tx = vote_transaction::new_compact_vote_state_update_transaction(
vote,
leader_vote_tx.message.recent_blockhash,
&node_keypair,
&vote_keypair,
&vote_keypair,
None,
);
gossip_vote_index += 1;
gossip_vote_index %= MAX_LOCKOUT_HISTORY;
cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8)
}
}
})
};
},
voter_thread_sleep_ms as u64,
);
// 4) Check that the cluster is making progress
cluster.check_for_new_roots(
@ -591,9 +544,7 @@ fn test_duplicate_shreds_broadcast_leader() {
);
// Clean up threads
exit.store(true, Ordering::Relaxed);
t_voter.join().unwrap();
gossip_service.join().unwrap();
gossip_voter.close();
}
#[test]