diff --git a/core/src/validator.rs b/core/src/validator.rs index f52dd56df..fd4784345 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -253,6 +253,20 @@ impl ValidatorConfig { ..Self::default() } } + + pub fn enable_default_rpc_block_subscribe(&mut self) { + let pubsub_config = PubSubConfig { + enable_block_subscription: true, + ..PubSubConfig::default() + }; + let rpc_config = JsonRpcConfig { + enable_rpc_transaction_history: true, + ..JsonRpcConfig::default_for_test() + }; + + self.pubsub_config = pubsub_config; + self.rpc_config = rpc_config; + } } // `ValidatorStartProgress` contains status information that is surfaced to the node operator over diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 7bad642bf..f0448eb84 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -10,13 +10,15 @@ use { solana_core::consensus::VOTE_THRESHOLD_DEPTH, solana_entry::entry::{Entry, EntrySlice}, solana_gossip::{ - cluster_info, - crds_value::{self, CrdsData, CrdsValue}, + cluster_info::{self, ClusterInfo}, + crds::Cursor, + crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel}, gossip_error::GossipError, - gossip_service::discover_cluster, + gossip_service::{self, discover_cluster, GossipService}, legacy_contact_info::LegacyContactInfo as ContactInfo, }, solana_ledger::blockstore::Blockstore, + solana_runtime::vote_transaction::VoteTransaction, solana_sdk::{ client::SyncClient, clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, @@ -29,16 +31,20 @@ use { signature::{Keypair, Signature, Signer}, system_transaction, timing::{duration_as_ms, timestamp}, + transaction::Transaction, transport::TransportError, }, solana_streamer::socket::SocketAddrSpace, solana_vote_program::vote_transaction, std::{ collections::{HashMap, HashSet}, - net::{IpAddr, Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, path::Path, - sync::{Arc, RwLock}, - thread::sleep, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{sleep, JoinHandle}, time::{Duration, Instant}, }, }; @@ -429,6 +435,102 @@ fn poll_all_nodes_for_signature( Ok(()) } +pub struct GossipVoter { + pub gossip_service: GossipService, + pub tcp_listener: Option, + pub cluster_info: Arc, + pub t_voter: JoinHandle<()>, + pub exit: Arc, +} + +impl GossipVoter { + pub fn close(self) { + self.exit.store(true, Ordering::Relaxed); + self.t_voter.join().unwrap(); + self.gossip_service.join().unwrap(); + } +} + +/// Reads votes from gossip and runs them through `vote_filter` to filter votes that then +/// get passed to `generate_vote_tx` to create votes that are then pushed into gossip as if +/// sent by a node with identity `node_keypair`. +pub fn start_gossip_voter( + gossip_addr: &SocketAddr, + node_keypair: &Keypair, + vote_filter: impl Fn((CrdsValueLabel, Transaction)) -> Option<(VoteTransaction, Transaction)> + + std::marker::Send + + 'static, + mut process_vote_tx: impl FnMut(Slot, &Transaction, &VoteTransaction, &ClusterInfo) + + std::marker::Send + + 'static, + sleep_ms: u64, +) -> GossipVoter { + let exit = Arc::new(AtomicBool::new(false)); + let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node( + // Need to use our validator's keypair to gossip EpochSlots and votes for our + // node later. + node_keypair.insecure_clone(), + Some(gossip_addr), + &exit, + None, + 0, + false, + SocketAddrSpace::Unspecified, + ); + + let t_voter = { + let exit = exit.clone(); + let cluster_info = cluster_info.clone(); + std::thread::spawn(move || { + let mut cursor = Cursor::default(); + loop { + if exit.load(Ordering::Relaxed) { + return; + } + + let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); + let mut parsed_vote_iter: Vec<_> = labels + .into_iter() + .zip(votes.into_iter()) + .filter_map(&vote_filter) + .collect(); + + parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| { + vote.last_voted_slot() + .unwrap() + .cmp(&vote2.last_voted_slot().unwrap()) + }); + + for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { + if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() { + info!("received vote for {}", latest_vote_slot); + process_vote_tx( + latest_vote_slot, + leader_vote_tx, + parsed_vote, + &cluster_info, + ) + } + // Give vote some time to propagate + sleep(Duration::from_millis(sleep_ms)); + } + + if parsed_vote_iter.is_empty() { + sleep(Duration::from_millis(sleep_ms)); + } + } + }) + }; + + GossipVoter { + gossip_service, + tcp_listener, + cluster_info, + t_voter, + exit, + } +} + fn get_and_verify_slot_entries( blockstore: &Blockstore, slot: Slot, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 96dec5c88..e5e7e4747 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -23,17 +23,21 @@ use { }, solana_local_cluster::{ cluster::{Cluster, ClusterValidatorInfo}, - cluster_tests, + cluster_tests::{self}, local_cluster::{ClusterConfig, LocalCluster}, validator_configs::*, }, solana_pubsub_client::pubsub_client::PubsubClient, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{ - config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, + config::{ + RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcProgramAccountsConfig, + RpcSignatureSubscribeConfig, + }, response::RpcSignatureResult, }, solana_runtime::{ + commitment::VOTE_THRESHOLD_SIZE, hardened_unpack::open_genesis_config, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, @@ -41,6 +45,7 @@ use { snapshot_utils::{ self, create_accounts_run_and_snapshot_dirs, ArchiveFormat, SnapshotVersion, }, + vote_parser, }, solana_sdk::{ account::AccountSharedData, @@ -56,7 +61,7 @@ use { system_program, system_transaction, }, solana_streamer::socket::SocketAddrSpace, - solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY, + solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, std::{ collections::{HashMap, HashSet}, fs, @@ -64,7 +69,7 @@ use { iter, path::Path, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, }, thread::{sleep, Builder, JoinHandle}, @@ -2510,6 +2515,213 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { ); } +#[test] +#[serial] +fn test_rpc_block_subscribe() { + let total_stake = 100 * DEFAULT_NODE_STAKE; + let leader_stake = total_stake; + let node_stakes = vec![leader_stake]; + let mut validator_config = ValidatorConfig::default_for_test(); + validator_config.enable_default_rpc_block_subscribe(); + + let validator_keys = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect::>(); + + let mut config = ClusterConfig { + cluster_lamports: total_stake, + node_stakes, + validator_configs: vec![validator_config], + validator_keys: Some(validator_keys), + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe( + &format!("ws://{}", &cluster.entry_point_info.rpc_pubsub.to_string()), + RpcBlockSubscribeFilter::All, + Some(RpcBlockSubscribeConfig { + commitment: Some(CommitmentConfig::confirmed()), + encoding: None, + transaction_details: None, + show_rewards: None, + max_supported_transaction_version: None, + }), + ) + .unwrap(); + + let mut received_block = false; + let max_wait = 10_000; + let start = Instant::now(); + while !received_block { + assert!( + start.elapsed() <= Duration::from_millis(max_wait), + "Went too long {max_wait} ms without receiving a confirmed block", + ); + let responses: Vec<_> = receiver.try_iter().collect(); + // Wait for a response + if !responses.is_empty() { + for response in responses { + assert!(response.value.err.is_none()); + assert!(response.value.block.is_some()); + if response.value.slot > 1 { + received_block = true; + } + } + } + sleep(Duration::from_millis(100)); + } + + // If we don't drop the cluster, the blocking web socket service + // won't return, and the `block_subscribe_client` won't shut down + drop(cluster); + block_subscribe_client.shutdown().unwrap(); +} + +#[test] +#[serial] +#[allow(unused_attributes)] +fn test_oc_bad_signatures() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + + let total_stake = 100 * DEFAULT_NODE_STAKE; + let leader_stake = (total_stake as f64 * VOTE_THRESHOLD_SIZE) as u64; + let our_node_stake = total_stake - leader_stake; + let node_stakes = vec![leader_stake, our_node_stake]; + let mut validator_config = ValidatorConfig { + require_tower: true, + ..ValidatorConfig::default_for_test() + }; + validator_config.enable_default_rpc_block_subscribe(); + let validator_keys = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect::>(); + + let our_id = validator_keys.last().unwrap().0.pubkey(); + let mut config = ClusterConfig { + cluster_lamports: total_stake, + node_stakes, + validator_configs: make_identical_validator_configs(&validator_config, 2), + validator_keys: Some(validator_keys), + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + + // 2) Kill our node and start up a thread to simulate votes to control our voting behavior + let our_info = cluster.exit_node(&our_id); + let node_keypair = our_info.info.keypair; + let vote_keypair = our_info.info.voting_keypair; + info!( + "our node id: {}, vote id: {}", + node_keypair.pubkey(), + vote_keypair.pubkey() + ); + + // 3) Start up a spy to listen for and push votes to leader TPU + let (rpc, tpu) = cluster_tests::get_client_facing_addr(&cluster.entry_point_info); + let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); + let voter_thread_sleep_ms: usize = 100; + let num_votes_simulated = Arc::new(AtomicUsize::new(0)); + let gossip_voter = cluster_tests::start_gossip_voter( + &cluster.entry_point_info.gossip, + &node_keypair, + |(_label, leader_vote_tx)| { + let vote = vote_parser::parse_vote_transaction(&leader_vote_tx) + .map(|(_, vote, ..)| vote) + .unwrap(); + // Filter out empty votes + if !vote.is_empty() { + Some((vote, leader_vote_tx)) + } else { + None + } + }, + { + let node_keypair = node_keypair.insecure_clone(); + let vote_keypair = vote_keypair.insecure_clone(); + let num_votes_simulated = num_votes_simulated.clone(); + move |vote_slot, leader_vote_tx, parsed_vote, _cluster_info| { + info!("received vote for {}", vote_slot); + let vote_hash = parsed_vote.hash(); + info!( + "Simulating vote from our node on slot {}, hash {}", + vote_slot, vote_hash + ); + + // Add all recent vote slots on this fork to allow cluster to pass + // vote threshold checks in replay. Note this will instantly force a + // root by this validator. + let vote_slots: Vec = vec![vote_slot]; + + let bad_authorized_signer_keypair = Keypair::new(); + let mut vote_tx = vote_transaction::new_vote_transaction( + vote_slots, + vote_hash, + leader_vote_tx.message.recent_blockhash, + &node_keypair, + &vote_keypair, + // Make a bad signer + &bad_authorized_signer_keypair, + None, + ); + client + .retry_transfer(&cluster_funding_keypair, &mut vote_tx, 5) + .unwrap(); + + num_votes_simulated.fetch_add(1, Ordering::Relaxed); + } + }, + voter_thread_sleep_ms as u64, + ); + + let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe( + &format!("ws://{}", &cluster.entry_point_info.rpc_pubsub.to_string()), + RpcBlockSubscribeFilter::All, + Some(RpcBlockSubscribeConfig { + commitment: Some(CommitmentConfig::confirmed()), + encoding: None, + transaction_details: None, + show_rewards: None, + max_supported_transaction_version: None, + }), + ) + .unwrap(); + + const MAX_VOTES_TO_SIMULATE: usize = 10; + // Make sure test doesn't take too long + assert!(voter_thread_sleep_ms * MAX_VOTES_TO_SIMULATE <= 1000); + loop { + let responses: Vec<_> = receiver.try_iter().collect(); + // Nothing should get optimistically confirmed or rooted + assert!(responses.is_empty()); + // Wait for the voter thread to attempt sufficient number of votes to give + // a chance for the violation to occur + if num_votes_simulated.load(Ordering::Relaxed) > MAX_VOTES_TO_SIMULATE { + break; + } + sleep(Duration::from_millis(100)); + } + + // Clean up voter thread + gossip_voter.close(); + + // If we don't drop the cluster, the blocking web socket service + // won't return, and the `block_subscribe_client` won't shut down + drop(cluster); + block_subscribe_client.shutdown().unwrap(); +} + #[test] #[serial] #[ignore] diff --git a/local-cluster/tests/local_cluster_slow_1.rs b/local-cluster/tests/local_cluster_slow_1.rs index d947400ce..8a2f036f6 100644 --- a/local-cluster/tests/local_cluster_slow_1.rs +++ b/local-cluster/tests/local_cluster_slow_1.rs @@ -13,14 +13,11 @@ use { replay_stage::DUPLICATE_THRESHOLD, validator::ValidatorConfig, }, - solana_gossip::{ - crds::Cursor, - gossip_service::{self, discover_cluster}, - }, + solana_gossip::gossip_service::discover_cluster, solana_ledger::ancestor_iterator::AncestorIterator, solana_local_cluster::{ cluster::{Cluster, ClusterValidatorInfo}, - cluster_tests, + cluster_tests::{self}, local_cluster::{ClusterConfig, LocalCluster}, validator_configs::*, }, @@ -37,10 +34,6 @@ use { std::{ collections::{BTreeSet, HashSet}, path::Path, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, thread::sleep, time::Duration, }, @@ -459,129 +452,89 @@ fn test_duplicate_shreds_broadcast_leader() { let bad_leader_ledger_path = cluster.validators[&bad_leader_id].info.ledger_path.clone(); info!("our node id: {}", node_keypair.pubkey()); - // 3) Start up a spy to listen for votes - let exit = Arc::new(AtomicBool::new(false)); - let (gossip_service, _tcp_listener, cluster_info) = gossip_service::make_gossip_node( - // Need to use our validator's keypair to gossip EpochSlots and votes for our - // node later. - node_keypair.insecure_clone(), - Some(&cluster.entry_point_info.gossip), - &exit, - None, - 0, - false, - SocketAddrSpace::Unspecified, - ); - - let t_voter = { - let exit = exit.clone(); - std::thread::spawn(move || { - let mut cursor = Cursor::default(); + // 3) Start up a gossip instance to listen for and push votes + let voter_thread_sleep_ms = 100; + let gossip_voter = cluster_tests::start_gossip_voter( + &cluster.entry_point_info.gossip, + &node_keypair, + move |(label, leader_vote_tx)| { + // Filter out votes not from the bad leader + if label.pubkey() == bad_leader_id { + let vote = vote_parser::parse_vote_transaction(&leader_vote_tx) + .map(|(_, vote, ..)| vote) + .unwrap(); + // Filter out empty votes + if !vote.is_empty() { + Some((vote, leader_vote_tx)) + } else { + None + } + } else { + None + } + }, + { + let node_keypair = node_keypair.insecure_clone(); + let vote_keypair = vote_keypair.insecure_clone(); let mut max_vote_slot = 0; let mut gossip_vote_index = 0; - loop { - if exit.load(Ordering::Relaxed) { - return; + move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| { + info!("received vote for {}", latest_vote_slot); + // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. + if latest_vote_slot > max_vote_slot { + let new_epoch_slots: Vec = + (max_vote_slot + 1..latest_vote_slot + 1).collect(); + info!( + "Simulating epoch slots from our node: {:?}", + new_epoch_slots + ); + cluster_info.push_epoch_slots(&new_epoch_slots); + max_vote_slot = latest_vote_slot; } - let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); - let mut parsed_vote_iter: Vec<_> = labels - .into_iter() - .zip(votes.into_iter()) - .filter_map(|(label, leader_vote_tx)| { - // Filter out votes not from the bad leader - if label.pubkey() == bad_leader_id { - let vote = vote_parser::parse_vote_transaction(&leader_vote_tx) - .map(|(_, vote, ..)| vote) - .unwrap(); - // Filter out empty votes - if !vote.is_empty() { - Some((vote, leader_vote_tx)) - } else { - None - } - } else { - None - } - }) - .collect(); + // Only vote on even slots. Note this may violate lockouts if the + // validator started voting on a different fork before we could exit + // it above. + let vote_hash = parsed_vote.hash(); + if latest_vote_slot % 2 == 0 { + info!( + "Simulating vote from our node on slot {}, hash {}", + latest_vote_slot, vote_hash + ); - parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| { - vote.last_voted_slot() - .unwrap() - .cmp(&vote2.last_voted_slot().unwrap()) - }); - - for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { - if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() { - info!("received vote for {}", latest_vote_slot); - // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. - if latest_vote_slot > max_vote_slot { - let new_epoch_slots: Vec = - (max_vote_slot + 1..latest_vote_slot + 1).collect(); - info!( - "Simulating epoch slots from our node: {:?}", - new_epoch_slots - ); - cluster_info.push_epoch_slots(&new_epoch_slots); - max_vote_slot = latest_vote_slot; - } - - // Only vote on even slots. Note this may violate lockouts if the - // validator started voting on a different fork before we could exit - // it above. - let vote_hash = parsed_vote.hash(); - if latest_vote_slot % 2 == 0 { - info!( - "Simulating vote from our node on slot {}, hash {}", - latest_vote_slot, vote_hash - ); - - // Add all recent vote slots on this fork to allow cluster to pass - // vote threshold checks in replay. Note this will instantly force a - // root by this validator, but we're not concerned with lockout violations - // by this validator so it's fine. - let leader_blockstore = open_blockstore(&bad_leader_ledger_path); - let mut vote_slots: Vec<(Slot, u32)> = AncestorIterator::new_inclusive( - latest_vote_slot, - &leader_blockstore, - ) + // Add all recent vote slots on this fork to allow cluster to pass + // vote threshold checks in replay. Note this will instantly force a + // root by this validator, but we're not concerned with lockout violations + // by this validator so it's fine. + let leader_blockstore = open_blockstore(&bad_leader_ledger_path); + let mut vote_slots: Vec<(Slot, u32)> = + AncestorIterator::new_inclusive(latest_vote_slot, &leader_blockstore) .take(MAX_LOCKOUT_HISTORY) .zip(1..) .collect(); - vote_slots.reverse(); - let mut vote = VoteStateUpdate::from(vote_slots); - let root = AncestorIterator::new_inclusive( - latest_vote_slot, - &leader_blockstore, - ) + vote_slots.reverse(); + let mut vote = VoteStateUpdate::from(vote_slots); + let root = + AncestorIterator::new_inclusive(latest_vote_slot, &leader_blockstore) .nth(MAX_LOCKOUT_HISTORY); - vote.root = root; - vote.hash = vote_hash; - let vote_tx = - vote_transaction::new_compact_vote_state_update_transaction( - vote, - leader_vote_tx.message.recent_blockhash, - &node_keypair, - &vote_keypair, - &vote_keypair, - None, - ); - gossip_vote_index += 1; - gossip_vote_index %= MAX_LOCKOUT_HISTORY; - cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) - } - } - // Give vote some time to propagate - sleep(Duration::from_millis(100)); - } - - if parsed_vote_iter.is_empty() { - sleep(Duration::from_millis(100)); + vote.root = root; + vote.hash = vote_hash; + let vote_tx = vote_transaction::new_compact_vote_state_update_transaction( + vote, + leader_vote_tx.message.recent_blockhash, + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ); + gossip_vote_index += 1; + gossip_vote_index %= MAX_LOCKOUT_HISTORY; + cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) } } - }) - }; + }, + voter_thread_sleep_ms as u64, + ); // 4) Check that the cluster is making progress cluster.check_for_new_roots( @@ -591,9 +544,7 @@ fn test_duplicate_shreds_broadcast_leader() { ); // Clean up threads - exit.store(true, Ordering::Relaxed); - t_voter.join().unwrap(); - gossip_service.join().unwrap(); + gossip_voter.close(); } #[test]