Weight concurrent streams by stake (#25993)

Weight concurrent streams by stake for staked nodes
Ported changes from #25056 after address merge conflicts and some refactoring
This commit is contained in:
Lijun Wang 2022-06-21 12:06:44 -07:00 committed by GitHub
parent 43e0d29b18
commit 61946a49c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 30 deletions

View File

@ -15,7 +15,9 @@ use {
},
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
solana_sdk::quic::{
QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc},
@ -395,7 +397,7 @@ impl QuicClient {
let chunks = buffers[1..buffers.len()]
.iter()
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
.chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
let futures: Vec<_> = chunks
.into_iter()

View File

@ -8,9 +8,11 @@ mod tests {
tpu_connection::TpuConnection,
},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::quic::{spawn_server, StreamStats},
solana_streamer::{
quic::{spawn_server, StreamStats},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
@ -28,7 +30,7 @@ mod tests {
let (sender, receiver) = unbounded();
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s.try_clone().unwrap(),

View File

@ -3,7 +3,7 @@ use {
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StreamerError},
solana_streamer::streamer::{self, StakedNodes, StreamerError},
std::{
collections::HashMap,
net::IpAddr,
@ -79,7 +79,7 @@ impl FindPacketSenderStakeStage {
pub fn new(
packet_receiver: streamer::PacketBatchReceiver,
sender: FindPacketSenderStakeSender,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
name: &'static str,
) -> Self {
let mut stats = FindPacketSenderStakeStats::default();
@ -105,7 +105,7 @@ impl FindPacketSenderStakeStage {
Measure::start("apply_sender_stakes_time");
let mut apply_stake = || {
let ip_to_stake = staked_nodes.read().unwrap();
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
Self::apply_sender_stakes(&mut batches, &ip_to_stake.stake_map);
};
apply_stake();
apply_sender_stakes_time.stop();

View File

@ -1,6 +1,7 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_streamer::streamer::StakedNodes,
std::{
collections::HashMap,
net::IpAddr,
@ -24,7 +25,7 @@ impl StakedNodesUpdaterService {
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
shared_staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
shared_staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-sn-updater".to_string())
@ -32,14 +33,17 @@ impl StakedNodesUpdaterService {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let mut new_ip_to_stake = HashMap::new();
let mut total_stake = 0;
if Self::try_refresh_ip_to_stake(
&mut last_stakes,
&mut new_ip_to_stake,
&mut total_stake,
&bank_forks,
&cluster_info,
) {
let mut shared = shared_staked_nodes.write().unwrap();
*shared = new_ip_to_stake;
shared.total_stake = total_stake as f64;
shared.stake_map = new_ip_to_stake;
}
}
})
@ -51,12 +55,17 @@ impl StakedNodesUpdaterService {
fn try_refresh_ip_to_stake(
last_stakes: &mut Instant,
ip_to_stake: &mut HashMap<IpAddr, u64>,
total_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*total_stake = staked_nodes
.iter()
.map(|(_pubkey, stake)| stake)
.sum::<u64>();
*ip_to_stake = cluster_info
.tvu_peers()
.into_iter()

View File

@ -30,11 +30,11 @@ use {
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
},
solana_sdk::signature::Keypair,
solana_streamer::quic::{
spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS,
solana_streamer::{
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
thread,
@ -127,7 +127,7 @@ impl Tpu {
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
);
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let staked_nodes_updater_service = StakedNodesUpdaterService::new(
exit.clone(),
cluster_info.clone(),

View File

@ -2,7 +2,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
// Empirically found max number of concurrent streams
// that seems to maximize TPS on GCE (higher values don't seem to
// give significant improvement or seem to impact stability)
pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048;
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;

View File

@ -1,11 +1,15 @@
use {
crate::quic::{configure_server, QuicServerError, StreamStats},
crate::{
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
},
crossbeam_channel::Sender,
futures_util::stream::StreamExt,
quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection},
quinn::{Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
signature::Keypair,
timing,
},
@ -21,6 +25,8 @@ use {
tokio::{task::JoinHandle, time::timeout},
};
const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64;
#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
sock: UdpSocket,
@ -29,7 +35,7 @@ pub fn spawn_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
@ -59,7 +65,7 @@ pub async fn run_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
@ -97,18 +103,30 @@ pub async fn run_server(
let (mut connection_table_l, stake) = {
let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.get(&remote_addr.ip()) {
if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) {
let stake = *stake;
let total_stake = staked_nodes.total_stake;
drop(staked_nodes);
let mut connection_table_l = staked_connection_table.lock().unwrap();
let num_pruned = connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection.set_max_concurrent_uni_streams(
VarInt::from_u64(
((stake as f64 / total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
as u64,
)
.unwrap(),
);
(connection_table_l, stake)
} else {
drop(staked_nodes);
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned = connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection.set_max_concurrent_uni_streams(
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64).unwrap(),
);
(connection_table_l, 0)
}
};
@ -453,7 +471,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
@ -652,7 +670,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
@ -682,7 +700,7 @@ pub mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,

View File

@ -1,4 +1,5 @@
use {
crate::streamer::StakedNodes,
crossbeam_channel::Sender,
pem::Pem,
pkcs8::{der::Document, AlgorithmIdentifier, ObjectIdentifier},
@ -7,11 +8,10 @@ use {
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::PACKET_DATA_SIZE,
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS},
quic::{QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
signature::Keypair,
},
std::{
collections::HashMap,
error::Error,
net::{IpAddr, UdpSocket},
sync::{
@ -49,7 +49,7 @@ pub(crate) fn configure_server(
let config = Arc::get_mut(&mut server_config.transport).unwrap();
// QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32;
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS * 2) as u32;
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
@ -258,7 +258,7 @@ pub fn spawn_server(
packet_sender: Sender<PacketBatch>,
exit: Arc<AtomicBool>,
max_connections_per_ip: usize,
staked_nodes: Arc<RwLock<HashMap<IpAddr, u64>>>,
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
@ -306,7 +306,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
@ -361,7 +361,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,
@ -403,7 +403,7 @@ mod test {
let keypair = Keypair::new();
let ip = "127.0.0.1".parse().unwrap();
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(HashMap::new()));
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
s,

View File

@ -24,6 +24,13 @@ use {
thiserror::Error,
};
// Total stake and nodes => stake map
#[derive(Default)]
pub struct StakedNodes {
pub total_stake: f64,
pub stake_map: HashMap<IpAddr, u64>,
}
pub type PacketBatchReceiver = Receiver<PacketBatch>;
pub type PacketBatchSender = Sender<PacketBatch>;