disables turbine and repair QUIC endpoints on mainnet-beta (#34523)

On mainnet-beta, respective QUIC endpoint are unnecessary for now until
testnet has fully migrated to QUIC. The commit disables turbine and
repair QUIC endpoints on mainnet-beta.
This commit is contained in:
behzad nouri 2023-12-19 19:12:18 +00:00 committed by GitHub
parent d00c9a45b2
commit 4feadbdb7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 78 additions and 54 deletions

View File

@ -108,7 +108,7 @@ use {
clock::Slot,
epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
exit::Exit,
genesis_config::GenesisConfig,
genesis_config::{ClusterType, GenesisConfig},
hash::Hash,
pubkey::Pubkey,
shred_version::compute_shred_version,
@ -471,12 +471,12 @@ pub struct Validator {
blockstore_metric_report_service: BlockstoreMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
turbine_quic_endpoint: Endpoint,
turbine_quic_endpoint: Option<Endpoint>,
turbine_quic_endpoint_runtime: Option<TokioRuntime>,
turbine_quic_endpoint_join_handle: solana_turbine::quic_endpoint::AsyncTryJoinHandle,
repair_quic_endpoint: Endpoint,
turbine_quic_endpoint_join_handle: Option<solana_turbine::quic_endpoint::AsyncTryJoinHandle>,
repair_quic_endpoint: Option<Endpoint>,
repair_quic_endpoint_runtime: Option<TokioRuntime>,
repair_quic_endpoint_join_handle: repair::quic_endpoint::AsyncTryJoinHandle,
repair_quic_endpoint_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
}
impl Validator {
@ -1170,58 +1170,74 @@ impl Validator {
// Outside test-validator crate, we always need a tokio runtime (and
// the respective handle) to initialize the turbine QUIC endpoint.
let current_runtime_handle = tokio::runtime::Handle::try_current();
let turbine_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solTurbineQuic")
.build()
.unwrap()
});
let turbine_quic_endpoint_runtime = (current_runtime_handle.is_err()
&& genesis_config.cluster_type != ClusterType::MainnetBeta)
.then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solTurbineQuic")
.build()
.unwrap()
});
let (turbine_quic_endpoint_sender, turbine_quic_endpoint_receiver) = unbounded();
let (
turbine_quic_endpoint,
turbine_quic_endpoint_sender,
turbine_quic_endpoint_join_handle,
) = solana_turbine::quic_endpoint::new_quic_endpoint(
turbine_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.tvu_quic,
node.info
.tvu(Protocol::QUIC)
.map_err(|err| format!("Invalid QUIC TVU address: {err:?}"))?
.ip(),
turbine_quic_endpoint_sender,
bank_forks.clone(),
)
.unwrap();
// Repair quic endpoint.
let repair_quic_endpoint_runtime = current_runtime_handle.is_err().then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solRepairQuic")
.build()
.unwrap()
});
let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) =
repair::quic_endpoint::new_quic_endpoint(
repair_quic_endpoint_runtime
) = if genesis_config.cluster_type == ClusterType::MainnetBeta {
let (sender, _receiver) = tokio::sync::mpsc::channel(1);
(None, sender, None)
} else {
solana_turbine::quic_endpoint::new_quic_endpoint(
turbine_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.serve_repair_quic,
node.sockets.tvu_quic,
node.info
.serve_repair(Protocol::QUIC)
.map_err(|err| format!("Invalid QUIC serve-repair address: {err:?}"))?
.tvu(Protocol::QUIC)
.map_err(|err| format!("Invalid QUIC TVU address: {err:?}"))?
.ip(),
repair_quic_endpoint_sender,
turbine_quic_endpoint_sender,
bank_forks.clone(),
)
.unwrap();
.map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
.unwrap()
};
// Repair quic endpoint.
let repair_quic_endpoint_runtime = (current_runtime_handle.is_err()
&& genesis_config.cluster_type != ClusterType::MainnetBeta)
.then(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("solRepairQuic")
.build()
.unwrap()
});
let (repair_quic_endpoint, repair_quic_endpoint_sender, repair_quic_endpoint_join_handle) =
if genesis_config.cluster_type == ClusterType::MainnetBeta {
let (sender, _receiver) = tokio::sync::mpsc::channel(1);
(None, sender, None)
} else {
repair::quic_endpoint::new_quic_endpoint(
repair_quic_endpoint_runtime
.as_ref()
.map(TokioRuntime::handle)
.unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()),
&identity_keypair,
node.sockets.serve_repair_quic,
node.info
.serve_repair(Protocol::QUIC)
.map_err(|err| format!("Invalid QUIC serve-repair address: {err:?}"))?
.ip(),
repair_quic_endpoint_sender,
bank_forks.clone(),
)
.map(|(endpoint, sender, join_handle)| (Some(endpoint), sender, Some(join_handle)))
.unwrap()
};
let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
let tower = match process_blockstore.process_to_create_tower() {
@ -1514,14 +1530,18 @@ impl Validator {
}
self.gossip_service.join().expect("gossip_service");
repair::quic_endpoint::close_quic_endpoint(&self.repair_quic_endpoint);
if let Some(repair_quic_endpoint) = &self.repair_quic_endpoint {
repair::quic_endpoint::close_quic_endpoint(repair_quic_endpoint);
}
self.serve_repair_service
.join()
.expect("serve_repair_service");
self.repair_quic_endpoint_runtime
.map(|runtime| runtime.block_on(self.repair_quic_endpoint_join_handle))
.transpose()
.unwrap();
if let Some(repair_quic_endpoint_join_handle) = self.repair_quic_endpoint_join_handle {
self.repair_quic_endpoint_runtime
.map(|runtime| runtime.block_on(repair_quic_endpoint_join_handle))
.transpose()
.unwrap();
};
self.stats_reporter_service
.join()
.expect("stats_reporter_service");
@ -1534,13 +1554,17 @@ impl Validator {
self.accounts_hash_verifier
.join()
.expect("accounts_hash_verifier");
solana_turbine::quic_endpoint::close_quic_endpoint(&self.turbine_quic_endpoint);
if let Some(turbine_quic_endpoint) = &self.turbine_quic_endpoint {
solana_turbine::quic_endpoint::close_quic_endpoint(turbine_quic_endpoint);
}
self.tpu.join().expect("tpu");
self.tvu.join().expect("tvu");
self.turbine_quic_endpoint_runtime
.map(|runtime| runtime.block_on(self.turbine_quic_endpoint_join_handle))
.transpose()
.unwrap();
if let Some(turbine_quic_endpoint_join_handle) = self.turbine_quic_endpoint_join_handle {
self.turbine_quic_endpoint_runtime
.map(|runtime| runtime.block_on(turbine_quic_endpoint_join_handle))
.transpose()
.unwrap();
}
self.completed_data_sets_service
.join()
.expect("completed_data_sets_service");