Add voting service (#18552)
This commit is contained in:
parent
e6f32f6ae0
commit
0f8bcf65af
|
@ -59,6 +59,7 @@ pub mod validator;
|
|||
pub mod verified_vote_packets;
|
||||
pub mod vote_simulator;
|
||||
pub mod vote_stake_tracker;
|
||||
pub mod voting_service;
|
||||
pub mod window_service;
|
||||
|
||||
#[macro_use]
|
||||
|
|
|
@ -20,6 +20,7 @@ use crate::{
|
|||
repair_service::DuplicateSlotsResetReceiver,
|
||||
rewards_recorder_service::RewardsRecorderSender,
|
||||
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
|
||||
voting_service::VoteOp,
|
||||
window_service::DuplicateSlotReceiver,
|
||||
};
|
||||
use solana_client::rpc_response::SlotUpdate;
|
||||
|
@ -316,6 +317,7 @@ impl ReplayStage {
|
|||
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
|
||||
cluster_slots_update_sender: ClusterSlotsUpdateSender,
|
||||
cost_update_sender: Sender<ExecuteTimings>,
|
||||
voting_sender: Sender<VoteOp>,
|
||||
) -> Self {
|
||||
let ReplayStageConfig {
|
||||
vote_account,
|
||||
|
@ -512,15 +514,17 @@ impl ReplayStage {
|
|||
|
||||
if let Some(heaviest_bank_on_same_voted_fork) = heaviest_bank_on_same_voted_fork.as_ref() {
|
||||
if let Some(my_latest_landed_vote) = progress.my_latest_landed_vote(heaviest_bank_on_same_voted_fork.slot()) {
|
||||
Self::refresh_last_vote(&mut tower, &cluster_info,
|
||||
Self::refresh_last_vote(&mut tower,
|
||||
heaviest_bank_on_same_voted_fork,
|
||||
&poh_recorder, my_latest_landed_vote,
|
||||
my_latest_landed_vote,
|
||||
&vote_account,
|
||||
&identity_keypair,
|
||||
&authorized_voter_keypairs.read().unwrap(),
|
||||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted, &mut
|
||||
last_vote_refresh_time);
|
||||
last_vote_refresh_time,
|
||||
&voting_sender,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -578,7 +582,6 @@ impl ReplayStage {
|
|||
|
||||
Self::handle_votable_bank(
|
||||
vote_bank,
|
||||
&poh_recorder,
|
||||
switch_fork_decision,
|
||||
&bank_forks,
|
||||
&mut tower,
|
||||
|
@ -586,7 +589,6 @@ impl ReplayStage {
|
|||
&vote_account,
|
||||
&identity_keypair,
|
||||
&authorized_voter_keypairs.read().unwrap(),
|
||||
&cluster_info,
|
||||
&blockstore,
|
||||
&leader_schedule_cache,
|
||||
&lockouts_sender,
|
||||
|
@ -602,6 +604,7 @@ impl ReplayStage {
|
|||
&mut voted_signatures,
|
||||
&mut has_new_vote_been_rooted,
|
||||
&mut replay_timing,
|
||||
&voting_sender,
|
||||
);
|
||||
};
|
||||
voting_time.stop();
|
||||
|
@ -1430,7 +1433,6 @@ impl ReplayStage {
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
fn handle_votable_bank(
|
||||
bank: &Arc<Bank>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
switch_fork_decision: &SwitchForkDecision,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
tower: &mut Tower,
|
||||
|
@ -1438,7 +1440,6 @@ impl ReplayStage {
|
|||
vote_account_pubkey: &Pubkey,
|
||||
identity_keypair: &Keypair,
|
||||
authorized_voter_keypairs: &[Arc<Keypair>],
|
||||
cluster_info: &Arc<ClusterInfo>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
lockouts_sender: &Sender<CommitmentAggregationData>,
|
||||
|
@ -1454,6 +1455,7 @@ impl ReplayStage {
|
|||
vote_signatures: &mut Vec<Signature>,
|
||||
has_new_vote_been_rooted: &mut bool,
|
||||
replay_timing: &mut ReplayTiming,
|
||||
voting_sender: &Sender<VoteOp>,
|
||||
) {
|
||||
if bank.is_empty() {
|
||||
inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
|
||||
|
@ -1529,9 +1531,7 @@ impl ReplayStage {
|
|||
replay_timing.update_commitment_cache_us += update_commitment_cache_time.as_us();
|
||||
|
||||
Self::push_vote(
|
||||
cluster_info,
|
||||
bank,
|
||||
poh_recorder,
|
||||
vote_account_pubkey,
|
||||
identity_keypair,
|
||||
authorized_voter_keypairs,
|
||||
|
@ -1540,6 +1540,7 @@ impl ReplayStage {
|
|||
vote_signatures,
|
||||
*has_new_vote_been_rooted,
|
||||
replay_timing,
|
||||
voting_sender,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -1641,9 +1642,7 @@ impl ReplayStage {
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
fn refresh_last_vote(
|
||||
tower: &mut Tower,
|
||||
cluster_info: &ClusterInfo,
|
||||
heaviest_bank_on_same_fork: &Bank,
|
||||
poh_recorder: &Mutex<PohRecorder>,
|
||||
my_latest_landed_vote: Slot,
|
||||
vote_account_pubkey: &Pubkey,
|
||||
identity_keypair: &Keypair,
|
||||
|
@ -1651,6 +1650,7 @@ impl ReplayStage {
|
|||
vote_signatures: &mut Vec<Signature>,
|
||||
has_new_vote_been_rooted: bool,
|
||||
last_vote_refresh_time: &mut LastVoteRefreshTime,
|
||||
voting_sender: &Sender<VoteOp>,
|
||||
) {
|
||||
let last_voted_slot = tower.last_voted_slot();
|
||||
if last_voted_slot.is_none() {
|
||||
|
@ -1707,20 +1707,19 @@ impl ReplayStage {
|
|||
("target_bank_slot", heaviest_bank_on_same_fork.slot(), i64),
|
||||
("target_bank_hash", hash_string, String),
|
||||
);
|
||||
let _ = cluster_info.send_transaction(
|
||||
&vote_tx,
|
||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
||||
);
|
||||
cluster_info.refresh_vote(vote_tx, last_voted_slot);
|
||||
voting_sender
|
||||
.send(VoteOp::RefreshVote {
|
||||
tx: vote_tx,
|
||||
last_voted_slot,
|
||||
})
|
||||
.unwrap_or_else(|err| warn!("Error: {:?}", err));
|
||||
last_vote_refresh_time.last_refresh_time = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn push_vote(
|
||||
cluster_info: &ClusterInfo,
|
||||
bank: &Bank,
|
||||
poh_recorder: &Mutex<PohRecorder>,
|
||||
vote_account_pubkey: &Pubkey,
|
||||
identity_keypair: &Keypair,
|
||||
authorized_voter_keypairs: &[Arc<Keypair>],
|
||||
|
@ -1729,6 +1728,7 @@ impl ReplayStage {
|
|||
vote_signatures: &mut Vec<Signature>,
|
||||
has_new_vote_been_rooted: bool,
|
||||
replay_timing: &mut ReplayTiming,
|
||||
voting_sender: &Sender<VoteOp>,
|
||||
) {
|
||||
let mut generate_time = Measure::start("generate_vote");
|
||||
let vote_tx = Self::generate_vote_tx(
|
||||
|
@ -1745,16 +1745,14 @@ impl ReplayStage {
|
|||
replay_timing.generate_vote_us += generate_time.as_us();
|
||||
if let Some(vote_tx) = vote_tx {
|
||||
tower.refresh_last_vote_tx_blockhash(vote_tx.message.recent_blockhash);
|
||||
let mut send_time = Measure::start("send_vote");
|
||||
let _ = cluster_info.send_transaction(
|
||||
&vote_tx,
|
||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
||||
);
|
||||
send_time.stop();
|
||||
let mut push_time = Measure::start("push_vote");
|
||||
cluster_info.push_vote(&tower.tower_slots(), vote_tx);
|
||||
push_time.stop();
|
||||
replay_timing.vote_push_us += push_time.as_us();
|
||||
|
||||
let tower_slots = tower.tower_slots();
|
||||
voting_sender
|
||||
.send(VoteOp::PushVote {
|
||||
tx: vote_tx,
|
||||
tower_slots,
|
||||
})
|
||||
.unwrap_or_else(|err| warn!("Error: {:?}", err));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2723,6 +2721,7 @@ mod tests {
|
|||
vote_state::{VoteState, VoteStateVersions},
|
||||
vote_transaction,
|
||||
};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::{
|
||||
fs::remove_dir_all,
|
||||
iter,
|
||||
|
@ -5349,15 +5348,14 @@ mod tests {
|
|||
}
|
||||
}
|
||||
}
|
||||
let (voting_sender, voting_receiver) = channel();
|
||||
|
||||
// Simulate landing a vote for slot 0 landing in slot 1
|
||||
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
|
||||
fill_bank_with_ticks(&bank1);
|
||||
tower.record_bank_vote(&bank0, &my_vote_pubkey);
|
||||
ReplayStage::push_vote(
|
||||
&cluster_info,
|
||||
&bank0,
|
||||
&poh_recorder,
|
||||
&my_vote_pubkey,
|
||||
&identity_keypair,
|
||||
&my_vote_keypair,
|
||||
|
@ -5366,7 +5364,13 @@ mod tests {
|
|||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted,
|
||||
&mut ReplayTiming::default(),
|
||||
&voting_sender,
|
||||
);
|
||||
let vote_info = voting_receiver
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.unwrap();
|
||||
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
|
||||
|
||||
let mut cursor = Cursor::default();
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
|
@ -5385,9 +5389,7 @@ mod tests {
|
|||
for refresh_bank in &[&bank1, &bank2] {
|
||||
ReplayStage::refresh_last_vote(
|
||||
&mut tower,
|
||||
&cluster_info,
|
||||
refresh_bank,
|
||||
&poh_recorder,
|
||||
Tower::last_voted_slot_in_bank(refresh_bank, &my_vote_pubkey).unwrap(),
|
||||
&my_vote_pubkey,
|
||||
&identity_keypair,
|
||||
|
@ -5395,6 +5397,7 @@ mod tests {
|
|||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted,
|
||||
&mut last_vote_refresh_time,
|
||||
&voting_sender,
|
||||
);
|
||||
|
||||
// No new votes have been submitted to gossip
|
||||
|
@ -5409,9 +5412,7 @@ mod tests {
|
|||
// not landing
|
||||
tower.record_bank_vote(&bank1, &my_vote_pubkey);
|
||||
ReplayStage::push_vote(
|
||||
&cluster_info,
|
||||
&bank1,
|
||||
&poh_recorder,
|
||||
&my_vote_pubkey,
|
||||
&identity_keypair,
|
||||
&my_vote_keypair,
|
||||
|
@ -5420,7 +5421,12 @@ mod tests {
|
|||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted,
|
||||
&mut ReplayTiming::default(),
|
||||
&voting_sender,
|
||||
);
|
||||
let vote_info = voting_receiver
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.unwrap();
|
||||
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
let vote_tx = &votes[0];
|
||||
|
@ -5432,9 +5438,7 @@ mod tests {
|
|||
// the last vote has not expired yet
|
||||
ReplayStage::refresh_last_vote(
|
||||
&mut tower,
|
||||
&cluster_info,
|
||||
&bank2,
|
||||
&poh_recorder,
|
||||
Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(),
|
||||
&my_vote_pubkey,
|
||||
&identity_keypair,
|
||||
|
@ -5442,7 +5446,9 @@ mod tests {
|
|||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted,
|
||||
&mut last_vote_refresh_time,
|
||||
&voting_sender,
|
||||
);
|
||||
|
||||
// No new votes have been submitted to gossip
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert!(votes.is_empty());
|
||||
|
@ -5469,9 +5475,7 @@ mod tests {
|
|||
let clone_refresh_time = last_vote_refresh_time.last_refresh_time;
|
||||
ReplayStage::refresh_last_vote(
|
||||
&mut tower,
|
||||
&cluster_info,
|
||||
&expired_bank,
|
||||
&poh_recorder,
|
||||
Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(),
|
||||
&my_vote_pubkey,
|
||||
&identity_keypair,
|
||||
|
@ -5479,7 +5483,13 @@ mod tests {
|
|||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted,
|
||||
&mut last_vote_refresh_time,
|
||||
&voting_sender,
|
||||
);
|
||||
let vote_info = voting_receiver
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.unwrap();
|
||||
crate::voting_service::VotingService::handle_vote(&cluster_info, &poh_recorder, vote_info);
|
||||
|
||||
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert_eq!(votes.len(), 1);
|
||||
|
@ -5526,9 +5536,7 @@ mod tests {
|
|||
last_vote_refresh_time.last_refresh_time = Instant::now();
|
||||
ReplayStage::refresh_last_vote(
|
||||
&mut tower,
|
||||
&cluster_info,
|
||||
&expired_bank_sibling,
|
||||
&poh_recorder,
|
||||
Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(),
|
||||
&my_vote_pubkey,
|
||||
&identity_keypair,
|
||||
|
@ -5536,7 +5544,9 @@ mod tests {
|
|||
&mut voted_signatures,
|
||||
has_new_vote_been_rooted,
|
||||
&mut last_vote_refresh_time,
|
||||
&voting_sender,
|
||||
);
|
||||
|
||||
let (_, votes) = cluster_info.get_votes(&mut cursor);
|
||||
assert!(votes.is_empty());
|
||||
assert_eq!(
|
||||
|
|
|
@ -22,6 +22,7 @@ use crate::{
|
|||
sigverify_shreds::ShredSigVerifier,
|
||||
sigverify_stage::SigVerifyStage,
|
||||
snapshot_packager_service::PendingSnapshotPackage,
|
||||
voting_service::VotingService,
|
||||
};
|
||||
use crossbeam_channel::unbounded;
|
||||
use solana_gossip::cluster_info::ClusterInfo;
|
||||
|
@ -67,6 +68,7 @@ pub struct Tvu {
|
|||
accounts_background_service: AccountsBackgroundService,
|
||||
accounts_hash_verifier: AccountsHashVerifier,
|
||||
cost_update_service: CostUpdateService,
|
||||
voting_service: VotingService,
|
||||
}
|
||||
|
||||
pub struct Sockets {
|
||||
|
@ -273,6 +275,10 @@ impl Tvu {
|
|||
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
|
||||
};
|
||||
|
||||
let (voting_sender, voting_receiver) = channel();
|
||||
let voting_service =
|
||||
VotingService::new(voting_receiver, cluster_info.clone(), poh_recorder.clone());
|
||||
|
||||
let (cost_update_sender, cost_update_receiver): (
|
||||
Sender<ExecuteTimings>,
|
||||
Receiver<ExecuteTimings>,
|
||||
|
@ -302,6 +308,7 @@ impl Tvu {
|
|||
gossip_verified_vote_hash_receiver,
|
||||
cluster_slots_update_sender,
|
||||
cost_update_sender,
|
||||
voting_sender,
|
||||
);
|
||||
|
||||
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
|
||||
|
@ -333,6 +340,7 @@ impl Tvu {
|
|||
accounts_background_service,
|
||||
accounts_hash_verifier,
|
||||
cost_update_service,
|
||||
voting_service,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -347,6 +355,7 @@ impl Tvu {
|
|||
self.replay_stage.join()?;
|
||||
self.accounts_hash_verifier.join()?;
|
||||
self.cost_update_service.join()?;
|
||||
self.voting_service.join()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
use solana_gossip::cluster_info::ClusterInfo;
|
||||
use solana_poh::poh_recorder::PohRecorder;
|
||||
use solana_sdk::{clock::Slot, transaction::Transaction};
|
||||
use std::{
|
||||
sync::{mpsc::Receiver, Arc, Mutex},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
};
|
||||
|
||||
pub enum VoteOp {
|
||||
PushVote {
|
||||
tx: Transaction,
|
||||
tower_slots: Vec<Slot>,
|
||||
},
|
||||
RefreshVote {
|
||||
tx: Transaction,
|
||||
last_voted_slot: Slot,
|
||||
},
|
||||
}
|
||||
|
||||
impl VoteOp {
|
||||
fn tx(&self) -> &Transaction {
|
||||
match self {
|
||||
VoteOp::PushVote { tx, tower_slots: _ } => tx,
|
||||
VoteOp::RefreshVote {
|
||||
tx,
|
||||
last_voted_slot: _,
|
||||
} => tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VotingService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl VotingService {
|
||||
pub fn new(
|
||||
vote_receiver: Receiver<VoteOp>,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
poh_recorder: Arc<Mutex<PohRecorder>>,
|
||||
) -> Self {
|
||||
let thread_hdl = Builder::new()
|
||||
.name("sol-vote-service".to_string())
|
||||
.spawn(move || {
|
||||
for vote_op in vote_receiver.iter() {
|
||||
Self::handle_vote(&cluster_info, &poh_recorder, vote_op);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
Self { thread_hdl }
|
||||
}
|
||||
|
||||
pub fn handle_vote(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &Mutex<PohRecorder>,
|
||||
vote_op: VoteOp,
|
||||
) {
|
||||
let _ = cluster_info.send_transaction(
|
||||
vote_op.tx(),
|
||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder),
|
||||
);
|
||||
|
||||
match vote_op {
|
||||
VoteOp::PushVote { tx, tower_slots } => {
|
||||
cluster_info.push_vote(&tower_slots, tx);
|
||||
}
|
||||
VoteOp::RefreshVote {
|
||||
tx,
|
||||
last_voted_slot,
|
||||
} => {
|
||||
cluster_info.refresh_vote(tx, last_voted_slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.thread_hdl.join()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue