gossip: notify state machine of duplicate proofs (#32963)

* gossip: notify state machine of duplicate proofs

* Add feature flag for ingesting duplicate proofs from Gossip.

* Use the Epoch the shred is in instead of the root bank epoch.

* Fix unittest by activating the feature.

* Add a test for feature disabled case.

* EpochSchedule is now not copyable, clone it explicitly.

* pr feedback: read epoch schedule on startup, add guard for ff recache

* pr feedback: bank_forks lock, -cached_slots_in_epoch, init ff

* pr feedback: bank.forks_try_read() -> read()

* pr feedback: fix local-cluster setup

* local-cluster: do not expose gossip internals, use retry mechanism instead

* local-cluster: split out case 4b into separate test and ignore

* pr feedback: avoid taking lock if ff is already found

* pr feedback: do not cache ff epoch

* pr feedback: bank_forks lock, revert to cached_slots_in_epoch

* pr feedback: move local variable into helper function

* pr feedback: use let else, remove epoch 0 hack

---------

Co-authored-by: Wen <crocoxu@gmail.com>
This commit is contained in:
Ashwin Sekar 2024-01-26 07:58:37 -08:00 committed by GitHub
parent 663a1bb8f4
commit 93271d91b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 206 additions and 49 deletions

View File

@ -228,7 +228,7 @@ impl Tvu {
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
@ -337,6 +337,7 @@ impl Tvu {
blockstore,
leader_schedule_cache.clone(),
bank_forks.clone(),
duplicate_slots_sender,
),
);

View File

@ -56,6 +56,8 @@ pub enum Error {
BlockstoreInsertFailed(#[from] BlockstoreError),
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("unable to send duplicate slot to state machine")]
DuplicateSlotSenderFailure,
#[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
#[error("invalid duplicate shreds")]

View File

@ -3,11 +3,13 @@ use {
duplicate_shred::{self, DuplicateShred, Error},
duplicate_shred_listener::DuplicateShredHandlerTrait,
},
crossbeam_channel::Sender,
log::error,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_runtime::bank_forks::BankForks,
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
pubkey::Pubkey,
},
std::{
@ -44,6 +46,8 @@ pub struct DuplicateShredHandler {
cached_on_epoch: Epoch,
cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
cached_slots_in_epoch: u64,
// Used to notify duplicate consensus state machine
duplicate_slots_sender: Sender<Slot>,
}
impl DuplicateShredHandlerTrait for DuplicateShredHandler {
@ -63,6 +67,7 @@ impl DuplicateShredHandler {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
duplicate_slots_sender: Sender<Slot>,
) -> Self {
Self {
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
@ -74,6 +79,7 @@ impl DuplicateShredHandler {
blockstore,
leader_schedule_cache,
bank_forks,
duplicate_slots_sender,
}
}
@ -131,12 +137,30 @@ impl DuplicateShredHandler {
shred1.into_payload(),
shred2.into_payload(),
)?;
if self.should_notify_state_machine(slot) {
// Notify duplicate consensus state machine
self.duplicate_slots_sender
.send(slot)
.map_err(|_| Error::DuplicateSlotSenderFailure)?;
}
}
self.consumed.insert(slot, true);
}
Ok(())
}
fn should_notify_state_machine(&self, slot: Slot) -> bool {
let root_bank = self.bank_forks.read().unwrap().root_bank();
let Some(activated_slot) = root_bank
.feature_set
.activated_slot(&feature_set::enable_gossip_duplicate_proof_ingestion::id())
else {
return false;
};
root_bank.epoch_schedule().get_epoch(slot)
> root_bank.epoch_schedule().get_epoch(activated_slot)
}
fn should_consume_slot(&mut self, slot: Slot) -> bool {
slot > self.last_root
&& slot < self.last_root.saturating_add(self.cached_slots_in_epoch)
@ -211,12 +235,14 @@ mod tests {
cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
duplicate_shred::{from_shred, tests::new_rand_shred},
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_ledger::{
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
shred::Shredder,
},
solana_runtime::bank::Bank,
solana_runtime::{accounts_background_service::AbsRequestSender, bank::Bank},
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
@ -271,16 +297,34 @@ mod tests {
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let mut bank = Bank::new_for_tests(&genesis_config);
bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
let bank_forks_arc = BankForks::new_rw_arc(bank);
{
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
&bank_forks_arc.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let (sender, receiver) = unbounded();
// The feature will only be activated at Epoch 1.
let start_slot: Slot = slots_in_epoch + 1;
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
1,
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
@ -288,20 +332,24 @@ mod tests {
let chunks1 = create_duplicate_proof(
my_keypair.clone(),
None,
2,
start_slot + 1,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(1));
assert!(!blockstore.has_duplicate_shreds_in_slot(2));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
// Test that two proofs are mixed together, but we can store the proofs fine.
for (chunk1, chunk2) in chunks.zip(chunks1) {
duplicate_shred_handler.handle(chunk1);
duplicate_shred_handler.handle(chunk2);
}
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(blockstore.has_duplicate_shreds_in_slot(2));
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + 1));
assert_eq!(
receiver.try_iter().collect_vec(),
vec![start_slot, start_slot + 1]
);
// Test all kinds of bad proofs.
for error in [
@ -312,7 +360,7 @@ mod tests {
match create_duplicate_proof(
my_keypair.clone(),
None,
3,
start_slot + 2,
Some(error),
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
) {
@ -321,7 +369,8 @@ mod tests {
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(3));
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 2));
assert!(receiver.is_empty());
}
}
}
@ -337,13 +386,29 @@ mod tests {
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config));
let mut bank = Bank::new_for_tests(&genesis_config);
bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
let slots_in_epoch = bank.get_epoch_info().slots_in_epoch;
let bank_forks_arc = BankForks::new_rw_arc(bank);
{
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.read().unwrap().working_bank(),
&bank_forks_arc.read().unwrap().working_bank(),
));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks);
let start_slot: Slot = 1;
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
// The feature will only be activated at Epoch 1.
let start_slot: Slot = slots_in_epoch + 1;
// This proof will not be accepted because num_chunks is too large.
let chunks = create_duplicate_proof(
@ -358,6 +423,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());
// This proof will be rejected because the slot is too far away in the future.
let future_slot =
@ -374,6 +440,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot));
assert!(receiver.is_empty());
// Send in two proofs, the first proof showing up will be accepted, the following
// proofs will be discarded.
@ -388,10 +455,54 @@ mod tests {
// handle chunk 0 of the first proof.
duplicate_shred_handler.handle(chunks.next().unwrap());
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());
// Now send in the rest of the first proof, it will succeed.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]);
}
#[test]
fn test_feature_disabled() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let my_keypair = Arc::new(Keypair::new());
let my_pubkey = my_keypair.pubkey();
let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000);
let GenesisConfigInfo { genesis_config, .. } = genesis_config_info;
let mut bank = Bank::new_for_tests(&genesis_config);
bank.deactivate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id());
assert!(!bank
.feature_set
.is_active(&feature_set::enable_gossip_duplicate_proof_ingestion::id()));
let bank_forks_arc = BankForks::new_rw_arc(bank);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks_arc.read().unwrap().working_bank(),
));
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_arc,
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
None,
1,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(1));
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
// If feature disabled, blockstore gets signal but state machine doesn't see it.
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(receiver.try_iter().collect_vec().is_empty());
}
}

View File

@ -41,7 +41,7 @@ use {
solana_vote_program::vote_transaction,
std::{
borrow::Borrow,
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
path::Path,
sync::{
@ -489,6 +489,9 @@ pub fn start_gossip_voter(
+ std::marker::Send
+ 'static,
sleep_ms: u64,
num_expected_peers: usize,
refresh_ms: u64,
max_votes_to_refresh: usize,
) -> GossipVoter {
let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node(
@ -503,6 +506,15 @@ pub fn start_gossip_voter(
SocketAddrSpace::Unspecified,
);
// Wait for peer discovery
while cluster_info.gossip_peers().len() < num_expected_peers {
sleep(Duration::from_millis(sleep_ms));
}
let mut latest_voted_slot = 0;
let mut refreshable_votes: VecDeque<(Transaction, VoteTransaction)> = VecDeque::new();
let mut latest_push_attempt = Instant::now();
let t_voter = {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
@ -514,6 +526,18 @@ pub fn start_gossip_voter(
}
let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor);
if labels.is_empty() {
if latest_push_attempt.elapsed() > Duration::from_millis(refresh_ms) {
for (leader_vote_tx, parsed_vote) in refreshable_votes.iter().rev() {
let vote_slot = parsed_vote.last_voted_slot().unwrap();
info!("gossip voter refreshing vote {}", vote_slot);
process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
latest_push_attempt = Instant::now();
}
}
sleep(Duration::from_millis(sleep_ms));
continue;
}
let mut parsed_vote_iter: Vec<_> = labels
.into_iter()
.zip(votes)
@ -527,22 +551,20 @@ pub fn start_gossip_voter(
});
for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", latest_vote_slot);
process_vote_tx(
latest_vote_slot,
leader_vote_tx,
parsed_vote,
&cluster_info,
)
if let Some(vote_slot) = parsed_vote.last_voted_slot() {
info!("received vote for {}", vote_slot);
if vote_slot > latest_voted_slot {
latest_voted_slot = vote_slot;
refreshable_votes
.push_front((leader_vote_tx.clone(), parsed_vote.clone()));
refreshable_votes.truncate(max_votes_to_refresh);
}
process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info);
latest_push_attempt = Instant::now();
}
// Give vote some time to propagate
sleep(Duration::from_millis(sleep_ms));
}
if parsed_vote_iter.is_empty() {
sleep(Duration::from_millis(sleep_ms));
}
}
})
};

View File

@ -2745,6 +2745,9 @@ fn test_oc_bad_signatures() {
}
},
voter_thread_sleep_ms as u64,
cluster.validators.len().saturating_sub(1),
0,
0,
);
let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe(
@ -3745,6 +3748,18 @@ fn test_kill_partition_switch_threshold_progress() {
#[serial]
#[allow(unused_attributes)]
fn test_duplicate_shreds_broadcast_leader() {
run_duplicate_shreds_broadcast_leader(true);
}
#[test]
#[serial]
#[ignore]
#[allow(unused_attributes)]
fn test_duplicate_shreds_broadcast_leader_ancestor_hashes() {
run_duplicate_shreds_broadcast_leader(false);
}
fn run_duplicate_shreds_broadcast_leader(vote_on_duplicate: bool) {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// Create 4 nodes:
// 1) Bad leader sending different versions of shreds to both of the other nodes
// 2) 1 node who's voting behavior in gossip
@ -3795,11 +3810,13 @@ fn test_duplicate_shreds_broadcast_leader() {
// for the partition.
assert!(partition_node_stake < our_node_stake && partition_node_stake < good_node_stake);
let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded();
// 1) Set up the cluster
let (mut cluster, validator_keys) = test_faulty_node(
BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig {
partition: ClusterPartition::Stake(partition_node_stake),
duplicate_slot_sender: None,
duplicate_slot_sender: Some(duplicate_slot_sender),
}),
node_stakes,
None,
@ -3841,27 +3858,23 @@ fn test_duplicate_shreds_broadcast_leader() {
{
let node_keypair = node_keypair.insecure_clone();
let vote_keypair = vote_keypair.insecure_clone();
let mut max_vote_slot = 0;
let mut gossip_vote_index = 0;
let mut duplicate_slots = vec![];
move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| {
info!("received vote for {}", latest_vote_slot);
// Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot.
if latest_vote_slot > max_vote_slot {
let new_epoch_slots: Vec<Slot> =
(max_vote_slot + 1..latest_vote_slot + 1).collect();
info!(
"Simulating epoch slots from our node: {:?}",
new_epoch_slots
);
cluster_info.push_epoch_slots(&new_epoch_slots);
max_vote_slot = latest_vote_slot;
}
let new_epoch_slots: Vec<Slot> = (0..latest_vote_slot + 1).collect();
info!(
"Simulating epoch slots from our node: {:?}",
new_epoch_slots
);
cluster_info.push_epoch_slots(&new_epoch_slots);
// Only vote on even slots. Note this may violate lockouts if the
// validator started voting on a different fork before we could exit
// it above.
for slot in duplicate_slot_receiver.try_iter() {
duplicate_slots.push(slot);
}
let vote_hash = parsed_vote.hash();
if latest_vote_slot % 2 == 0 {
if vote_on_duplicate || !duplicate_slots.contains(&latest_vote_slot) {
info!(
"Simulating vote from our node on slot {}, hash {}",
latest_vote_slot, vote_hash
@ -3899,6 +3912,9 @@ fn test_duplicate_shreds_broadcast_leader() {
}
},
voter_thread_sleep_ms as u64,
cluster.validators.len().saturating_sub(1),
5000, // Refresh if 5 seconds of inactivity
5, // Refresh the past 5 votes
);
// 4) Check that the cluster is making progress

View File

@ -772,6 +772,10 @@ pub mod cost_model_requested_write_lock_cost {
solana_sdk::declare_id!("wLckV1a64ngtcKPRGU4S4grVTestXjmNjxBjaKZrAcn");
}
pub mod enable_gossip_duplicate_proof_ingestion {
solana_sdk::declare_id!("FNKCMBzYUdjhHyPdsKG2LSmdzH8TCHXn3ytj8RNBS4nG");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -960,6 +964,7 @@ lazy_static! {
(enable_zk_proof_from_account::id(), "Enable zk token proof program to read proof from accounts instead of instruction data #34750"),
(curve25519_restrict_msm_length::id(), "restrict curve25519 multiscalar multiplication vector lengths #34763"),
(cost_model_requested_write_lock_cost::id(), "cost model uses number of requested write locks #34819"),
(enable_gossip_duplicate_proof_ingestion::id(), "enable gossip duplicate proof ingestion #32963"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()

View File

@ -6,7 +6,7 @@ use {
solana_vote_program::vote_state::{Vote, VoteStateUpdate},
};
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum VoteTransaction {
Vote(Vote),
VoteStateUpdate(VoteStateUpdate),