TransactionScheduler: CLI and hookup for central-scheduler (#33890)

This commit is contained in:
Andrew Fitzgerald 2023-11-13 22:18:54 +08:00 committed by GitHub
parent ae30572585
commit 81a007b3c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 219 additions and 47 deletions

View File

@ -16,17 +16,26 @@ use {
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
},
crate::{
banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats,
banking_stage::{
consume_worker::ConsumeWorker,
packet_deserializer::PacketDeserializer,
transaction_scheduler::{
prio_graph_scheduler::PrioGraphScheduler,
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
},
},
banking_trace::BankingPacketReceiver,
tracer_packet_stats::TracerPacketStats,
validator::BlockProductionMethod,
},
crossbeam_channel::RecvTimeoutError,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::{measure, measure_us},
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::PohRecorder,
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::timing::AtomicInterval,
solana_vote::vote_sender_types::ReplayVoteSender,
@ -378,6 +387,20 @@ impl BankingStage {
prioritization_fee_cache,
)
}
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
cluster_info,
poh_recorder,
non_vote_receiver,
tpu_vote_receiver,
gossip_vote_receiver,
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
),
}
}
@ -405,6 +428,15 @@ impl BankingStage {
TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize);
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();
// Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|id| {
@ -432,16 +464,6 @@ impl BankingStage {
),
};
let mut packet_receiver =
PacketReceiver::new(id, packet_receiver, bank_forks.clone());
let poh_recorder = poh_recorder.clone();
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let forwarder = Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
@ -449,31 +471,175 @@ impl BankingStage {
connection_cache.clone(),
data_budget.clone(),
);
let consumer = Consumer::new(
committer,
poh_recorder.read().unwrap().new_recorder(),
QosService::new(id),
log_messages_bytes_limit,
);
Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.spawn(move || {
Self::process_loop(
&mut packet_receiver,
&decision_maker,
&forwarder,
&consumer,
id,
unprocessed_transaction_storage,
);
})
.unwrap()
Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
forwarder,
unprocessed_transaction_storage,
)
})
.collect();
Self { bank_thread_hdls }
}
#[allow(clippy::too_many_arguments)]
pub fn new_central_scheduler(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
gossip_vote_receiver: BankingPacketReceiver,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let data_budget = Arc::new(DataBudget::default());
// Keeps track of extraneous vote transactions for the vote threads
let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new());
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
);
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();
// + 1 for the central scheduler thread
let mut bank_thread_hdls = Vec::with_capacity(num_threads as usize + 1);
// Spawn legacy voting threads first: 1 gossip, 1 tpu
for (id, packet_receiver, vote_source) in [
(0, gossip_vote_receiver, VoteSource::Gossip),
(1, tpu_vote_receiver, VoteSource::Tpu),
] {
bank_thread_hdls.push(Self::spawn_thread_local_multi_iterator_thread(
id,
packet_receiver,
bank_forks.clone(),
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
cluster_info.clone(),
connection_cache.clone(),
data_budget.clone(),
),
UnprocessedTransactionStorage::new_vote_storage(
latest_unprocessed_votes.clone(),
vote_source,
),
));
}
// Create channels for communication between scheduler and workers
let num_workers = (num_threads).saturating_sub(NUM_VOTE_PROCESSING_THREADS);
let (work_senders, work_receivers): (Vec<Sender<_>>, Vec<Receiver<_>>) =
(0..num_workers).map(|_| unbounded()).unzip();
let (finished_work_sender, finished_work_receiver) = unbounded();
// Spawn the worker threads
for (index, work_receiver) in work_receivers.into_iter().enumerate() {
let id = (index as u32).saturating_add(NUM_VOTE_PROCESSING_THREADS);
let consume_worker = ConsumeWorker::new(
work_receiver,
Consumer::new(
committer.clone(),
poh_recorder.read().unwrap().new_recorder(),
QosService::new(id),
log_messages_bytes_limit,
),
finished_work_sender.clone(),
poh_recorder.read().unwrap().new_leader_bank_notifier(),
);
bank_thread_hdls.push(
Builder::new()
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
let _ = consume_worker.run();
})
.unwrap(),
)
}
// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer =
PacketDeserializer::new(non_vote_receiver, bank_forks.clone());
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
bank_forks,
scheduler,
);
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
})
.unwrap()
});
Self { bank_thread_hdls }
}
fn spawn_thread_local_multi_iterator_thread(
id: u32,
packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
forwarder: Forwarder,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
let consumer = Consumer::new(
committer,
transaction_recorder,
QosService::new(id),
log_messages_bytes_limit,
);
Builder::new()
.name(format!("solBanknStgTx{id:02}"))
.spawn(move || {
Self::process_loop(
&mut packet_receiver,
&decision_maker,
&forwarder,
&consumer,
id,
unprocessed_transaction_storage,
)
})
.unwrap()
}
#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
decision_maker: &DecisionMaker,
@ -793,8 +959,7 @@ mod tests {
with_vers.into_iter().map(|(b, _)| b).collect()
}
#[test]
fn test_banking_stage_entries_only() {
fn test_banking_stage_entries_only(block_production_method: BlockProductionMethod) {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
@ -829,7 +994,7 @@ mod tests {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let banking_stage = BankingStage::new(
BlockProductionMethod::ThreadLocalMultiIterator,
block_production_method,
&cluster_info,
&poh_recorder,
non_vote_receiver,
@ -922,6 +1087,16 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[test]
fn test_banking_stage_entries_only_thread_local_multi_iterator() {
test_banking_stage_entries_only(BlockProductionMethod::ThreadLocalMultiIterator);
}
#[test]
fn test_banking_stage_entries_only_central_scheduler() {
test_banking_stage_entries_only(BlockProductionMethod::CentralScheduler);
}
#[test]
fn test_banking_stage_entryfication() {
solana_logger::setup();

View File

@ -36,6 +36,7 @@ pub(super) struct PreBalanceInfo {
pub mint_decimals: HashMap<Pubkey, u8>,
}
#[derive(Clone)]
pub struct Committer {
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,

View File

@ -28,6 +28,7 @@ impl BufferedPacketsDecision {
}
}
#[derive(Clone)]
pub struct DecisionMaker {
my_pubkey: Pubkey,
poh_recorder: Arc<RwLock<PohRecorder>>,

View File

@ -1,19 +1,13 @@
mod batch_id_generator;
#[allow(dead_code)]
mod in_flight_tracker;
pub(crate) mod prio_graph_scheduler;
pub(crate) mod scheduler_controller;
pub(crate) mod scheduler_error;
mod thread_aware_account_locks;
mod transaction_id_generator;
mod transaction_priority_id;
#[allow(dead_code)]
mod transaction_state;
#[allow(dead_code)]
mod transaction_state_container;
mod batch_id_generator;
#[allow(dead_code)]
mod in_flight_tracker;
#[allow(dead_code)]
mod prio_graph_scheduler;
#[allow(dead_code)]
mod scheduler_controller;
mod scheduler_error;
#[allow(dead_code)]
mod transaction_id_generator;

View File

@ -168,6 +168,7 @@ impl BlockVerificationMethod {
pub enum BlockProductionMethod {
#[default]
ThreadLocalMultiIterator,
CentralScheduler,
}
impl BlockProductionMethod {