BankingStage Refactor: Separate Next Leader Functions (#29401)
Separate next_leader functions
This commit is contained in:
parent
64c13b74d8
commit
5fc83a3d19
|
@ -14,6 +14,7 @@ use {
|
|||
leader_slot_banking_stage_timing_metrics::{
|
||||
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
|
||||
},
|
||||
next_leader::{next_leader_tpu_forwards, next_leader_tpu_vote},
|
||||
packet_deserializer::PacketDeserializer,
|
||||
qos_service::QosService,
|
||||
sigverify::SigverifyTracerPacketStats,
|
||||
|
@ -31,9 +32,7 @@ use {
|
|||
itertools::Itertools,
|
||||
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
|
||||
solana_entry::entry::hash_transactions,
|
||||
solana_gossip::{
|
||||
cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo,
|
||||
},
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_ledger::{
|
||||
blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances,
|
||||
},
|
||||
|
@ -78,7 +77,7 @@ use {
|
|||
cmp,
|
||||
collections::HashMap,
|
||||
env,
|
||||
net::{SocketAddr, UdpSocket},
|
||||
net::UdpSocket,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, RwLock,
|
||||
|
@ -1816,48 +1815,6 @@ impl BankingStage {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn next_leader_tpu(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
) -> Option<(Pubkey, std::net::SocketAddr)> {
|
||||
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
|
||||
}
|
||||
|
||||
fn next_leader_tpu_forwards(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
) -> Option<(Pubkey, std::net::SocketAddr)> {
|
||||
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
|
||||
}
|
||||
|
||||
pub(crate) fn next_leader_tpu_vote(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
) -> Option<(Pubkey, std::net::SocketAddr)> {
|
||||
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
|
||||
}
|
||||
|
||||
fn next_leader_x<F>(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
port_selector: F,
|
||||
) -> Option<(Pubkey, std::net::SocketAddr)>
|
||||
where
|
||||
F: FnOnce(&ContactInfo) -> SocketAddr,
|
||||
{
|
||||
let leader_pubkey = poh_recorder
|
||||
.read()
|
||||
.unwrap()
|
||||
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
|
||||
if let Some(leader_pubkey) = leader_pubkey {
|
||||
cluster_info
|
||||
.lookup_contact_info(&leader_pubkey, port_selector)
|
||||
.map(|addr| (leader_pubkey, addr))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
|
@ -1866,7 +1823,9 @@ mod tests {
|
|||
crossbeam_channel::{unbounded, Receiver},
|
||||
solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta},
|
||||
solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice},
|
||||
solana_gossip::cluster_info::Node,
|
||||
solana_gossip::{
|
||||
cluster_info::Node, legacy_contact_info::LegacyContactInfo as ContactInfo,
|
||||
},
|
||||
solana_ledger::{
|
||||
blockstore::{entries_to_test_shreds, Blockstore},
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
|
|
|
@ -38,6 +38,7 @@ pub mod leader_slot_banking_stage_timing_metrics;
|
|||
pub mod ledger_cleanup_service;
|
||||
pub mod ledger_metric_report_service;
|
||||
pub mod multi_iterator_scanner;
|
||||
pub mod next_leader;
|
||||
pub mod optimistic_confirmation_verifier;
|
||||
pub mod outstanding_requests;
|
||||
pub mod packet_deserializer;
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
use {
|
||||
solana_gossip::{
|
||||
cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo,
|
||||
},
|
||||
solana_poh::poh_recorder::PohRecorder,
|
||||
solana_sdk::{clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey},
|
||||
std::{net::SocketAddr, sync::RwLock},
|
||||
};
|
||||
|
||||
pub(crate) fn next_leader_tpu(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
) -> Option<(Pubkey, SocketAddr)> {
|
||||
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu)
|
||||
}
|
||||
|
||||
pub(crate) fn next_leader_tpu_forwards(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
) -> Option<(Pubkey, SocketAddr)> {
|
||||
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_forwards)
|
||||
}
|
||||
|
||||
pub(crate) fn next_leader_tpu_vote(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
) -> Option<(Pubkey, SocketAddr)> {
|
||||
next_leader_x(cluster_info, poh_recorder, |leader| leader.tpu_vote)
|
||||
}
|
||||
|
||||
fn next_leader_x<F>(
|
||||
cluster_info: &ClusterInfo,
|
||||
poh_recorder: &RwLock<PohRecorder>,
|
||||
port_selector: F,
|
||||
) -> Option<(Pubkey, SocketAddr)>
|
||||
where
|
||||
F: FnOnce(&ContactInfo) -> SocketAddr,
|
||||
{
|
||||
let leader_pubkey = poh_recorder
|
||||
.read()
|
||||
.unwrap()
|
||||
.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET);
|
||||
if let Some(leader_pubkey) = leader_pubkey {
|
||||
cluster_info
|
||||
.lookup_contact_info(&leader_pubkey, port_selector)
|
||||
.map(|addr| (leader_pubkey, addr))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
|
@ -82,9 +82,9 @@ impl VotingService {
|
|||
}
|
||||
|
||||
let pubkey_and_target_address = if send_to_tpu_vote_port {
|
||||
crate::banking_stage::next_leader_tpu_vote(cluster_info, poh_recorder)
|
||||
crate::next_leader::next_leader_tpu_vote(cluster_info, poh_recorder)
|
||||
} else {
|
||||
crate::banking_stage::next_leader_tpu(cluster_info, poh_recorder)
|
||||
crate::next_leader::next_leader_tpu(cluster_info, poh_recorder)
|
||||
};
|
||||
let _ = cluster_info.send_transaction(
|
||||
vote_op.tx(),
|
||||
|
|
Loading…
Reference in New Issue