Reduce the default number of IP echo server threads (#354)

The IP echo server currently spins up a worker thread for every thread
on the machine. Observing some data for nodes,
- MNB validators and RPC nodes look to get several hundred of these
  requests per day
- MNB entrypoint nodes look to get 2-3 requests per second on average

In both instances, the current threadpool is severely overprovisioned
which is a waste of resources. This PR plumnbs a flag to control the
number of worker threads for this pool as well as setting a default of
two threads for this server. Two threads allow for one thread to always
listen on the TCP port while the other thread processes requests
This commit is contained in:
steviez 2024-04-01 10:24:59 -05:00 committed by GitHub
parent 92c9b45479
commit 79e316eb56
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 72 additions and 6 deletions

1
Cargo.lock generated
View File

@ -6542,6 +6542,7 @@ dependencies = [
"solana-logger",
"solana-sdk",
"solana-version",
"static_assertions",
"tokio",
"url 2.5.0",
]

View File

@ -269,6 +269,7 @@ pub struct ValidatorConfig {
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}
@ -338,6 +339,7 @@ impl Default for ValidatorConfig {
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
@ -1079,6 +1081,7 @@ impl Validator {
None => None,
Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
tcp_listener,
config.ip_echo_server_threads,
Some(node.info.shred_version()),
)),
};

View File

@ -7,6 +7,7 @@ use {
solana_client::{
connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClientWrapper,
},
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS,
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
@ -159,8 +160,14 @@ pub fn discover(
if let Some(my_gossip_addr) = my_gossip_addr {
info!("Gossip Address: {:?}", my_gossip_addr);
}
let _ip_echo_server = ip_echo
.map(|tcp_listener| solana_net_utils::ip_echo_server(tcp_listener, Some(my_shred_version)));
let _ip_echo_server = ip_echo.map(|tcp_listener| {
solana_net_utils::ip_echo_server(
tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
Some(my_shred_version),
)
});
let (met_criteria, elapsed, all_peers, tvu_peers) = spy(
spy_ref.clone(),
num_nodes,

View File

@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
}

View File

@ -22,6 +22,7 @@ socket2 = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }
solana-version = { workspace = true }
static_assertions = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }

View File

@ -1,5 +1,6 @@
use {
clap::{Arg, Command},
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS,
std::net::{Ipv4Addr, SocketAddr, TcpListener},
};
@ -21,7 +22,11 @@ fn main() {
.unwrap_or_else(|_| panic!("Unable to parse {port}"));
let bind_addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener");
let _runtime = solana_net_utils::ip_echo_server(tcp_listener, /*shred_version=*/ None);
let _runtime = solana_net_utils::ip_echo_server(
tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ None,
);
loop {
std::thread::park();
}

View File

@ -6,6 +6,7 @@ use {
std::{
io,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
time::Duration,
},
tokio::{
@ -18,6 +19,14 @@ use {
pub type IpEchoServer = Runtime;
// Enforce a minimum of two threads:
// - One thread to monitor the TcpListener and spawn async tasks
// - One thread to service the spawned tasks
// The unsafe is safe because we're using a fixed, known non-zero value
pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2) };
// IP echo requests require little computation and come in fairly infrequently,
// so keep the number of server workers small to avoid overhead
pub const DEFAULT_IP_ECHO_SERVER_THREADS: NonZeroUsize = MINIMUM_IP_ECHO_SERVER_THREADS;
pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;
const IO_TIMEOUT: Duration = Duration::from_secs(5);
@ -168,6 +177,7 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Opt
/// connects. Used by |get_public_ip_addr|
pub fn ip_echo_server(
tcp_listener: std::net::TcpListener,
num_server_threads: NonZeroUsize,
// Cluster shred-version of the node running the server.
shred_version: Option<u16>,
) -> IpEchoServer {
@ -175,6 +185,7 @@ pub fn ip_echo_server(
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("solIpEchoSrvrRt")
.worker_threads(num_server_threads.get())
.enable_all()
.build()
.expect("new tokio runtime");

View File

@ -16,7 +16,10 @@ use {
};
mod ip_echo_server;
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
pub use ip_echo_server::{
ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE,
MINIMUM_IP_ECHO_SERVER_THREADS,
};
use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse};
/// A data type representing a public Udp socket
@ -744,7 +747,11 @@ mod tests {
let (_server_port, (server_udp_socket, server_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42));
let _runtime = ip_echo_server(
server_tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ Some(42),
);
let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
@ -764,7 +771,11 @@ mod tests {
let (client_port, (client_udp_socket, client_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535));
let _runtime = ip_echo_server(
server_tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ Some(65535),
);
let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(

View File

@ -5323,6 +5323,7 @@ dependencies = [
"solana-logger",
"solana-sdk",
"solana-version",
"static_assertions",
"tokio",
"url 2.5.0",
]

View File

@ -9,6 +9,7 @@ use {
// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
}
@ -16,6 +17,7 @@ pub struct DefaultThreadArgs {
impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
}
@ -24,6 +26,7 @@ impl Default for DefaultThreadArgs {
pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
]
@ -41,12 +44,18 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}
pub struct NumThreadConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}
pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
NonZeroUsize
),
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
@ -86,6 +95,20 @@ trait ThreadArg {
}
}
struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
const LONG_NAME: &'static str = "ip-echo-server-threads";
const HELP: &'static str = "Number of threads to use for the IP echo server";
fn default() -> usize {
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
}
fn min() -> usize {
solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
}
}
struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";

View File

@ -1332,6 +1332,7 @@ pub fn main() {
let full_api = matches.is_present("full_rpc_api");
let cli::thread_args::NumThreadConfig {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
@ -1474,6 +1475,7 @@ pub fn main() {
use_snapshot_archives_at_startup::cli::NAME,
UseSnapshotArchivesAtStartup
),
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
..ValidatorConfig::default()