reworks streamer::StakedNodes (#31082)
{min,max}_stake are computed but never assigned: https://github.com/solana-labs/solana/blob/4564bcdc1/core/src/staked_nodes_updater_service.rs#L54-L57 The updater code is also inefficient and verbose.
This commit is contained in:
parent
4a157446ac
commit
ce21a58b65
|
@ -98,11 +98,14 @@ fn create_connection_cache(
|
|||
let (stake, total_stake) =
|
||||
find_node_activated_stake(rpc_client, client_node_id.pubkey()).unwrap_or_default();
|
||||
info!("Stake for specified client_node_id: {stake}, total stake: {total_stake}");
|
||||
let staked_nodes = Arc::new(RwLock::new(StakedNodes {
|
||||
total_stake,
|
||||
pubkey_stake_map: HashMap::from([(client_node_id.pubkey(), stake)]),
|
||||
..StakedNodes::default()
|
||||
}));
|
||||
let stakes = HashMap::from([
|
||||
(client_node_id.pubkey(), stake),
|
||||
(Pubkey::new_unique(), total_stake - stake),
|
||||
]);
|
||||
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
|
||||
Arc::new(stakes),
|
||||
HashMap::<Pubkey, u64>::default(), // overrides
|
||||
)));
|
||||
ConnectionCache::new_with_client_options(
|
||||
tpu_connection_pool_size,
|
||||
None,
|
||||
|
|
|
@ -1,21 +1,19 @@
|
|||
use {
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::pubkey::Pubkey,
|
||||
solana_streamer::streamer::StakedNodes,
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::IpAddr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock, RwLockReadGuard,
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, sleep, Builder, JoinHandle},
|
||||
time::{Duration, Instant},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
},
|
||||
};
|
||||
|
||||
const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5);
|
||||
const STAKE_REFRESH_CYCLE: Duration = Duration::from_secs(5);
|
||||
|
||||
pub struct StakedNodesUpdaterService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
|
@ -24,35 +22,21 @@ pub struct StakedNodesUpdaterService {
|
|||
impl StakedNodesUpdaterService {
|
||||
pub fn new(
|
||||
exit: Arc<AtomicBool>,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
shared_staked_nodes: Arc<RwLock<StakedNodes>>,
|
||||
shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
|
||||
staked_nodes: Arc<RwLock<StakedNodes>>,
|
||||
staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
|
||||
) -> Self {
|
||||
let thread_hdl = Builder::new()
|
||||
.name("solStakedNodeUd".to_string())
|
||||
.spawn(move || {
|
||||
let mut last_stakes = Instant::now();
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
let overrides = shared_staked_nodes_overrides.read().unwrap();
|
||||
let mut new_id_to_stake = HashMap::new();
|
||||
let mut total_stake = 0;
|
||||
let mut max_stake: u64 = 0;
|
||||
let mut min_stake: u64 = u64::MAX;
|
||||
if Self::try_refresh_stake_maps(
|
||||
&mut last_stakes,
|
||||
&mut new_id_to_stake,
|
||||
&mut total_stake,
|
||||
&mut max_stake,
|
||||
&mut min_stake,
|
||||
&bank_forks,
|
||||
&cluster_info,
|
||||
&overrides,
|
||||
) {
|
||||
let mut shared = shared_staked_nodes.write().unwrap();
|
||||
shared.total_stake = total_stake;
|
||||
shared.pubkey_stake_map = new_id_to_stake;
|
||||
}
|
||||
let stakes = {
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
root_bank.staked_nodes()
|
||||
};
|
||||
let overrides = staked_nodes_overrides.read().unwrap().clone();
|
||||
*staked_nodes.write().unwrap() = StakedNodes::new(stakes, overrides);
|
||||
std::thread::sleep(STAKE_REFRESH_CYCLE);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
@ -60,76 +44,6 @@ impl StakedNodesUpdaterService {
|
|||
Self { thread_hdl }
|
||||
}
|
||||
|
||||
fn try_refresh_stake_maps(
|
||||
last_stakes: &mut Instant,
|
||||
id_to_stake: &mut HashMap<Pubkey, u64>,
|
||||
total_stake: &mut u64,
|
||||
max_stake: &mut u64,
|
||||
min_stake: &mut u64,
|
||||
bank_forks: &RwLock<BankForks>,
|
||||
cluster_info: &ClusterInfo,
|
||||
overrides: &RwLockReadGuard<HashMap<Pubkey, u64>>,
|
||||
) -> bool {
|
||||
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
let staked_nodes = root_bank.staked_nodes();
|
||||
|
||||
for stake in staked_nodes.values() {
|
||||
*total_stake += stake;
|
||||
*max_stake = *stake.max(max_stake);
|
||||
*min_stake = *stake.min(min_stake);
|
||||
}
|
||||
|
||||
*id_to_stake = cluster_info
|
||||
.tvu_peers()
|
||||
.into_iter()
|
||||
.filter_map(|node| {
|
||||
let stake = staked_nodes.get(&node.id)?;
|
||||
Some((node.id, *stake))
|
||||
})
|
||||
.collect();
|
||||
let my_pubkey = *cluster_info.my_contact_info().pubkey();
|
||||
if let Some(stake) = staked_nodes.get(&my_pubkey) {
|
||||
id_to_stake.insert(my_pubkey, *stake);
|
||||
}
|
||||
Self::override_stake(cluster_info, total_stake, id_to_stake, overrides);
|
||||
|
||||
*last_stakes = Instant::now();
|
||||
true
|
||||
} else {
|
||||
sleep(Duration::from_millis(1));
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn override_stake(
|
||||
cluster_info: &ClusterInfo,
|
||||
total_stake: &mut u64,
|
||||
id_to_stake_map: &mut HashMap<Pubkey, u64>,
|
||||
staked_map_overrides: &HashMap<Pubkey, u64>,
|
||||
) {
|
||||
let nodes: HashMap<Pubkey, IpAddr> = cluster_info
|
||||
.all_peers()
|
||||
.into_iter()
|
||||
.map(|(node, _)| (node.id, node.tvu.ip()))
|
||||
.collect();
|
||||
for (id_override, stake_override) in staked_map_overrides {
|
||||
if nodes.contains_key(id_override) {
|
||||
if let Some(previous_stake) = id_to_stake_map.get(id_override) {
|
||||
*total_stake -= previous_stake;
|
||||
}
|
||||
*total_stake += stake_override;
|
||||
id_to_stake_map.insert(*id_override, *stake_override);
|
||||
} else {
|
||||
error!(
|
||||
"staked nodes overrides configuration for id \
|
||||
{id_override} with stake {stake_override} does not \
|
||||
match existing IP. Skipping",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.thread_hdl.join()
|
||||
}
|
||||
|
|
|
@ -132,7 +132,6 @@ impl Tpu {
|
|||
|
||||
let staked_nodes_updater_service = StakedNodesUpdaterService::new(
|
||||
exit.clone(),
|
||||
cluster_info.clone(),
|
||||
bank_forks.clone(),
|
||||
staked_nodes.clone(),
|
||||
shared_staked_nodes_overrides,
|
||||
|
|
|
@ -122,9 +122,9 @@ impl QuicConfig {
|
|||
(ConnectionPeerType::Unstaked, 0, 0),
|
||||
|stakes| {
|
||||
let rstakes = stakes.read().unwrap();
|
||||
rstakes.pubkey_stake_map.get(&pubkey).map_or(
|
||||
(ConnectionPeerType::Unstaked, 0, rstakes.total_stake),
|
||||
|stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake),
|
||||
rstakes.get_node_stake(&pubkey).map_or(
|
||||
(ConnectionPeerType::Unstaked, 0, rstakes.total_stake()),
|
||||
|stake| (ConnectionPeerType::Staked, stake, rstakes.total_stake()),
|
||||
)
|
||||
},
|
||||
)
|
||||
|
@ -233,6 +233,7 @@ mod tests {
|
|||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
|
||||
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
|
||||
},
|
||||
std::collections::HashMap,
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
@ -252,19 +253,18 @@ mod tests {
|
|||
connection_config.compute_max_parallel_streams(),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
|
||||
staked_nodes.write().unwrap().total_stake = 10000;
|
||||
let overrides = HashMap::<Pubkey, u64>::default();
|
||||
let mut stakes = HashMap::from([(Pubkey::new_unique(), 10_000)]);
|
||||
*staked_nodes.write().unwrap() =
|
||||
StakedNodes::new(Arc::new(stakes.clone()), overrides.clone());
|
||||
assert_eq!(
|
||||
connection_config.compute_max_parallel_streams(),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
|
||||
staked_nodes
|
||||
.write()
|
||||
.unwrap()
|
||||
.pubkey_stake_map
|
||||
.insert(pubkey, 1);
|
||||
|
||||
stakes.insert(pubkey, 1);
|
||||
*staked_nodes.write().unwrap() =
|
||||
StakedNodes::new(Arc::new(stakes.clone()), overrides.clone());
|
||||
let delta =
|
||||
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
|
||||
|
||||
|
@ -272,17 +272,8 @@ mod tests {
|
|||
connection_config.compute_max_parallel_streams(),
|
||||
(QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize
|
||||
);
|
||||
|
||||
staked_nodes
|
||||
.write()
|
||||
.unwrap()
|
||||
.pubkey_stake_map
|
||||
.remove(&pubkey);
|
||||
staked_nodes
|
||||
.write()
|
||||
.unwrap()
|
||||
.pubkey_stake_map
|
||||
.insert(pubkey, 1000);
|
||||
stakes.insert(pubkey, 1_000);
|
||||
*staked_nodes.write().unwrap() = StakedNodes::new(Arc::new(stakes.clone()), overrides);
|
||||
assert_ne!(
|
||||
connection_config.compute_max_parallel_streams(),
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
|
|
|
@ -211,10 +211,10 @@ fn get_connection_stake(
|
|||
let staked_nodes = staked_nodes.read().unwrap();
|
||||
Some((
|
||||
pubkey,
|
||||
staked_nodes.pubkey_stake_map.get(&pubkey).copied()?,
|
||||
staked_nodes.total_stake,
|
||||
staked_nodes.max_stake,
|
||||
staked_nodes.min_stake,
|
||||
staked_nodes.get_node_stake(&pubkey)?,
|
||||
staked_nodes.total_stake(),
|
||||
staked_nodes.max_stake(),
|
||||
staked_nodes.min_stake(),
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -1102,7 +1102,7 @@ pub mod test {
|
|||
signature::Keypair,
|
||||
signer::Signer,
|
||||
},
|
||||
std::net::Ipv4Addr,
|
||||
std::{collections::HashMap, net::Ipv4Addr},
|
||||
tokio::time::sleep,
|
||||
};
|
||||
|
||||
|
@ -1518,12 +1518,11 @@ pub mod test {
|
|||
solana_logger::setup();
|
||||
|
||||
let client_keypair = Keypair::new();
|
||||
let mut staked_nodes = StakedNodes::default();
|
||||
staked_nodes
|
||||
.pubkey_stake_map
|
||||
.insert(client_keypair.pubkey(), 100000);
|
||||
staked_nodes.total_stake = 100000;
|
||||
|
||||
let stakes = HashMap::from([(client_keypair.pubkey(), 100_000)]);
|
||||
let staked_nodes = StakedNodes::new(
|
||||
Arc::new(stakes),
|
||||
HashMap::<Pubkey, u64>::default(), // overrides
|
||||
);
|
||||
let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes), 1);
|
||||
check_multiple_writes(receiver, server_address, Some(&client_keypair)).await;
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
|
@ -1545,12 +1544,11 @@ pub mod test {
|
|||
solana_logger::setup();
|
||||
|
||||
let client_keypair = Keypair::new();
|
||||
let mut staked_nodes = StakedNodes::default();
|
||||
staked_nodes
|
||||
.pubkey_stake_map
|
||||
.insert(client_keypair.pubkey(), 0);
|
||||
staked_nodes.total_stake = 0;
|
||||
|
||||
let stakes = HashMap::from([(client_keypair.pubkey(), 0)]);
|
||||
let staked_nodes = StakedNodes::new(
|
||||
Arc::new(stakes),
|
||||
HashMap::<Pubkey, u64>::default(), // overrides
|
||||
);
|
||||
let (t, exit, receiver, server_address, stats) = setup_quic_server(Some(staked_nodes), 1);
|
||||
check_multiple_writes(receiver, server_address, Some(&client_keypair)).await;
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
|
|
|
@ -9,6 +9,7 @@ use {
|
|||
},
|
||||
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
|
||||
histogram::Histogram,
|
||||
itertools::Itertools,
|
||||
solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp},
|
||||
std::{
|
||||
cmp::Reverse,
|
||||
|
@ -27,10 +28,11 @@ use {
|
|||
// Total stake and nodes => stake map
|
||||
#[derive(Default)]
|
||||
pub struct StakedNodes {
|
||||
pub total_stake: u64,
|
||||
pub max_stake: u64,
|
||||
pub min_stake: u64,
|
||||
pub pubkey_stake_map: HashMap<Pubkey, u64>,
|
||||
stakes: Arc<HashMap<Pubkey, u64>>,
|
||||
overrides: HashMap<Pubkey, u64>,
|
||||
total_stake: u64,
|
||||
max_stake: u64,
|
||||
min_stake: u64,
|
||||
}
|
||||
|
||||
pub type PacketBatchReceiver = Receiver<PacketBatch>;
|
||||
|
@ -292,6 +294,49 @@ impl StreamerSendStats {
|
|||
}
|
||||
}
|
||||
|
||||
impl StakedNodes {
|
||||
pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
|
||||
let values = stakes
|
||||
.iter()
|
||||
.filter(|(pubkey, _)| !overrides.contains_key(pubkey))
|
||||
.map(|(_, &stake)| stake)
|
||||
.chain(overrides.values().copied())
|
||||
.filter(|&stake| stake > 0);
|
||||
let total_stake = values.clone().sum();
|
||||
let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
|
||||
Self {
|
||||
stakes,
|
||||
overrides,
|
||||
total_stake,
|
||||
max_stake,
|
||||
min_stake,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_node_stake(&self, pubkey: &Pubkey) -> Option<u64> {
|
||||
self.overrides
|
||||
.get(pubkey)
|
||||
.or_else(|| self.stakes.get(pubkey))
|
||||
.filter(|&&stake| stake > 0)
|
||||
.copied()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn total_stake(&self) -> u64 {
|
||||
self.total_stake
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) fn min_stake(&self) -> u64 {
|
||||
self.min_stake
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) fn max_stake(&self) -> u64 {
|
||||
self.max_stake
|
||||
}
|
||||
}
|
||||
|
||||
fn recv_send(
|
||||
sock: &UdpSocket,
|
||||
r: &PacketBatchReceiver,
|
||||
|
|
Loading…
Reference in New Issue