cli: Aggregate cluster info stats by software version (#25103)

* cli: Aggregate cluster info stats by software version

* remove unused itertools dep
This commit is contained in:
Justin Starry 2022-05-11 21:33:22 +08:00 committed by GitHub
parent 11fa0db850
commit 766e361111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 231 additions and 89 deletions

View File

@ -20,13 +20,7 @@ use {
pubkey::Pubkey,
transaction::Transaction,
},
std::{
cmp::Ordering,
collections::{HashMap, HashSet},
fmt,
str::FromStr,
sync::Arc,
},
std::{cmp::Ordering, collections::HashMap, fmt, str::FromStr, sync::Arc},
};
const DEFAULT_MAX_ACTIVE_DISPLAY_AGE_SLOTS: Slot = 15_000_000; // ~90days
@ -115,6 +109,8 @@ pub struct CliFeatures {
pub feature_activation_allowed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_feature_sets: Option<CliClusterFeatureSets>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_software_versions: Option<CliClusterSoftwareVersions>,
#[serde(skip)]
pub inactive: bool,
}
@ -156,6 +152,10 @@ impl fmt::Display for CliFeatures {
)?;
}
if let Some(software_versions) = &self.cluster_software_versions {
write!(f, "{}", software_versions)?;
}
if let Some(feature_sets) = &self.cluster_feature_sets {
write!(f, "{}", feature_sets)?;
}
@ -180,13 +180,86 @@ impl VerboseDisplay for CliFeatures {}
#[serde(rename_all = "camelCase")]
pub struct CliClusterFeatureSets {
pub tool_feature_set: u32,
pub feature_sets: Vec<CliFeatureSet>,
pub feature_sets: Vec<CliFeatureSetStats>,
#[serde(skip)]
pub stake_allowed: bool,
#[serde(skip)]
pub rpc_allowed: bool,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CliClusterSoftwareVersions {
tool_software_version: CliVersion,
software_versions: Vec<CliSoftwareVersionStats>,
}
impl fmt::Display for CliClusterSoftwareVersions {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let software_version_title = "Software Version";
let stake_percent_title = "Stake";
let rpc_percent_title = "RPC";
let mut max_software_version_len = software_version_title.len();
let mut max_stake_percent_len = stake_percent_title.len();
let mut max_rpc_percent_len = rpc_percent_title.len();
let software_versions: Vec<_> = self
.software_versions
.iter()
.map(|software_version_stats| {
let stake_percent = format!("{:.2}%", software_version_stats.stake_percent);
let rpc_percent = format!("{:.2}%", software_version_stats.rpc_percent);
let software_version = software_version_stats.software_version.to_string();
max_software_version_len = max_software_version_len.max(software_version.len());
max_stake_percent_len = max_stake_percent_len.max(stake_percent.len());
max_rpc_percent_len = max_rpc_percent_len.max(rpc_percent.len());
(software_version, stake_percent, rpc_percent)
})
.collect();
writeln!(
f,
"\n\n{}",
style(format!(
"Tool Software Version: {}",
self.tool_software_version
))
.bold()
)?;
writeln!(
f,
"{}",
style(format!(
"{1:<0$} {3:>2$} {5:>4$}",
max_software_version_len,
software_version_title,
max_stake_percent_len,
stake_percent_title,
max_rpc_percent_len,
rpc_percent_title,
))
.bold(),
)?;
for (software_version, stake_percent, rpc_percent) in software_versions {
let me = self.tool_software_version.to_string() == software_version;
writeln!(
f,
"{1:<0$} {3:>2$} {5:>4$} {6}",
max_software_version_len,
software_version,
max_stake_percent_len,
stake_percent,
max_rpc_percent_len,
rpc_percent,
if me { "<-- me" } else { "" },
)?;
}
writeln!(f)
}
}
impl fmt::Display for CliClusterFeatureSets {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut tool_feature_set_matches_cluster = false;
@ -274,7 +347,7 @@ impl fmt::Display for CliClusterFeatureSets {
f,
"{}",
style(format!(
"{1:<0$} {3:<2$} {5:<4$} {7:<6$}",
"{1:<0$} {3:<2$} {5:>4$} {7:>6$}",
max_software_versions_len,
software_versions_title,
max_feature_set_len,
@ -289,7 +362,7 @@ impl fmt::Display for CliClusterFeatureSets {
for (software_versions, feature_set, stake_percent, rpc_percent, me) in feature_sets {
writeln!(
f,
"{1:<0$} {3:>2$} {5:>4$} {7:>6$} {8}",
"{1:<0$} {3:>2$} {5:>4$} {7:>6$} {8}",
max_software_versions_len,
software_versions,
max_feature_set_len,
@ -310,14 +383,22 @@ impl VerboseDisplay for CliClusterFeatureSets {}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CliFeatureSet {
pub struct CliFeatureSetStats {
software_versions: Vec<CliVersion>,
feature_set: u32,
stake_percent: f64,
rpc_percent: f32,
}
#[derive(Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CliSoftwareVersionStats {
software_version: CliVersion,
stake_percent: f64,
rpc_percent: f32,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone)]
struct CliVersion(Option<semver::Version>);
impl fmt::Display for CliVersion {
@ -489,25 +570,61 @@ pub fn process_feature_subcommand(
}
}
#[derive(Debug, Default)]
struct WorkingFeatureSetStatsEntry {
stake: u64,
rpc_nodes_count: u32,
software_versions: HashSet<Option<semver::Version>>,
}
type WorkingFeatureSetStats = HashMap<u32, WorkingFeatureSetStatsEntry>;
#[derive(Debug, Default)]
struct FeatureSetStatsEntry {
stake_percent: f64,
rpc_nodes_percent: f32,
software_versions: Vec<Option<semver::Version>>,
software_versions: Vec<CliVersion>,
}
type FeatureSetStats = HashMap<u32, FeatureSetStatsEntry>;
fn feature_set_stats(rpc_client: &RpcClient) -> Result<FeatureSetStats, ClientError> {
// Validator identity -> feature set
let feature_sets = rpc_client
#[derive(Debug, Default, Clone, Copy)]
struct ClusterInfoStatsEntry {
stake_percent: f64,
rpc_percent: f32,
}
struct ClusterInfoStats {
stats_map: HashMap<(u32, CliVersion), ClusterInfoStatsEntry>,
}
impl ClusterInfoStats {
fn aggregate_by_feature_set(&self) -> HashMap<u32, FeatureSetStatsEntry> {
let mut feature_set_map = HashMap::<u32, FeatureSetStatsEntry>::new();
for ((feature_set, software_version), stats_entry) in &self.stats_map {
let mut map_entry = feature_set_map.entry(*feature_set).or_default();
map_entry.rpc_nodes_percent += stats_entry.rpc_percent;
map_entry.stake_percent += stats_entry.stake_percent;
map_entry.software_versions.push(software_version.clone());
}
for stats_entry in feature_set_map.values_mut() {
stats_entry
.software_versions
.sort_by(|l, r| l.cmp(r).reverse());
}
feature_set_map
}
fn aggregate_by_software_version(&self) -> HashMap<CliVersion, ClusterInfoStatsEntry> {
let mut software_version_map = HashMap::<CliVersion, ClusterInfoStatsEntry>::new();
for ((_feature_set, software_version), stats_entry) in &self.stats_map {
let mut map_entry = software_version_map
.entry(software_version.clone())
.or_default();
map_entry.rpc_percent += stats_entry.rpc_percent;
map_entry.stake_percent += stats_entry.stake_percent;
}
software_version_map
}
}
fn cluster_info_stats(rpc_client: &RpcClient) -> Result<ClusterInfoStats, ClientError> {
#[derive(Default)]
struct StatsEntry {
stake_lamports: u64,
rpc_nodes_count: u32,
}
let cluster_info_list = rpc_client
.get_cluster_nodes()?
.into_iter()
.map(|contact_info| {
@ -539,67 +656,78 @@ fn feature_set_stats(rpc_client: &RpcClient) -> Result<FeatureSetStats, ClientEr
})
.collect::<HashMap<_, _>>();
let mut feature_set_stats: WorkingFeatureSetStats = HashMap::new();
let mut cluster_info_stats: HashMap<(u32, CliVersion), StatsEntry> = HashMap::new();
let mut total_rpc_nodes = 0;
for (node_id, feature_set, is_rpc, version) in feature_sets {
for (node_id, feature_set, is_rpc, version) in cluster_info_list {
let feature_set = feature_set.unwrap_or(0);
let feature_set_entry = feature_set_stats.entry(feature_set).or_default();
feature_set_entry.software_versions.insert(version);
let stats_entry = cluster_info_stats
.entry((feature_set, CliVersion(version)))
.or_default();
if let Some(vote_stake) = vote_stakes.get(&node_id) {
feature_set_entry.stake += *vote_stake;
stats_entry.stake_lamports += *vote_stake;
}
if is_rpc {
feature_set_entry.rpc_nodes_count += 1;
stats_entry.rpc_nodes_count += 1;
total_rpc_nodes += 1;
}
}
Ok(feature_set_stats
.into_iter()
.filter_map(
|(
feature_set,
WorkingFeatureSetStatsEntry {
stake,
rpc_nodes_count,
software_versions,
Ok(ClusterInfoStats {
stats_map: cluster_info_stats
.into_iter()
.filter_map(
|(
cluster_config,
StatsEntry {
stake_lamports,
rpc_nodes_count,
},
)| {
let stake_percent = (stake_lamports as f64 / total_active_stake as f64) * 100.;
let rpc_percent = (rpc_nodes_count as f32 / total_rpc_nodes as f32) * 100.;
if stake_percent >= 0.001 || rpc_percent >= 0.001 {
Some((
cluster_config,
ClusterInfoStatsEntry {
stake_percent,
rpc_percent,
},
))
} else {
None
}
},
)| {
let stake_percent = (stake as f64 / total_active_stake as f64) * 100.;
let rpc_nodes_percent = (rpc_nodes_count as f32 / total_rpc_nodes as f32) * 100.;
let mut software_versions = software_versions.into_iter().collect::<Vec<_>>();
software_versions.sort();
if stake_percent >= 0.001 || rpc_nodes_percent >= 0.001 {
Some((
feature_set,
FeatureSetStatsEntry {
stake_percent,
rpc_nodes_percent,
software_versions,
},
))
} else {
None
}
},
)
.collect())
)
.collect(),
})
}
// Feature activation is only allowed when 95% of the active stake is on the current feature set
fn feature_activation_allowed(
rpc_client: &RpcClient,
quiet: bool,
) -> Result<(bool, Option<CliClusterFeatureSets>), ClientError> {
let my_feature_set = solana_version::Version::default().feature_set;
let feature_set_stats = feature_set_stats(rpc_client)?;
) -> Result<
(
bool,
Option<CliClusterFeatureSets>,
Option<CliClusterSoftwareVersions>,
),
ClientError,
> {
let cluster_info_stats = cluster_info_stats(rpc_client)?;
let feature_set_stats = cluster_info_stats.aggregate_by_feature_set();
let tool_version = solana_version::Version::default();
let tool_feature_set = tool_version.feature_set;
let tool_software_version = CliVersion(Some(semver::Version::new(
tool_version.major as u64,
tool_version.minor as u64,
tool_version.patch as u64,
)));
let (stake_allowed, rpc_allowed) = feature_set_stats
.get(&my_feature_set)
.get(&tool_feature_set)
.map(
|FeatureSetStatsEntry {
stake_percent,
@ -607,31 +735,40 @@ fn feature_activation_allowed(
..
}| (*stake_percent >= 95., *rpc_nodes_percent >= 95.),
)
.unwrap_or((false, false));
.unwrap_or_default();
let cluster_software_versions = if quiet {
None
} else {
let mut software_versions: Vec<_> = cluster_info_stats
.aggregate_by_software_version()
.into_iter()
.map(|(software_version, stats)| CliSoftwareVersionStats {
software_version,
stake_percent: stats.stake_percent,
rpc_percent: stats.rpc_percent,
})
.collect();
software_versions.sort_by(|l, r| l.software_version.cmp(&r.software_version).reverse());
Some(CliClusterSoftwareVersions {
software_versions,
tool_software_version,
})
};
let cluster_feature_sets = if quiet {
None
} else {
let mut feature_sets = feature_set_stats
let mut feature_sets: Vec<_> = feature_set_stats
.into_iter()
.map(
|(
feature_set,
FeatureSetStatsEntry {
stake_percent,
rpc_nodes_percent: rpc_percent,
software_versions,
},
)| {
CliFeatureSet {
software_versions: software_versions.into_iter().map(CliVersion).collect(),
feature_set,
stake_percent,
rpc_percent,
}
},
)
.collect::<Vec<_>>();
.map(|(feature_set, stats_entry)| CliFeatureSetStats {
feature_set,
software_versions: stats_entry.software_versions,
rpc_percent: stats_entry.rpc_nodes_percent,
stake_percent: stats_entry.stake_percent,
})
.collect();
feature_sets.sort_by(|l, r| {
match l.software_versions[0]
.cmp(&r.software_versions[0])
@ -654,14 +791,18 @@ fn feature_activation_allowed(
}
});
Some(CliClusterFeatureSets {
tool_feature_set: my_feature_set,
tool_feature_set,
feature_sets,
stake_allowed,
rpc_allowed,
})
};
Ok((stake_allowed && rpc_allowed, cluster_feature_sets))
Ok((
stake_allowed && rpc_allowed,
cluster_feature_sets,
cluster_software_versions,
))
}
fn status_from_account(account: Account) -> Option<CliFeatureStatus> {
@ -734,7 +875,7 @@ fn process_status(
features.sort_unstable();
let (feature_activation_allowed, cluster_feature_sets) =
let (feature_activation_allowed, cluster_feature_sets, cluster_software_versions) =
feature_activation_allowed(rpc_client, features.len() <= 1)?;
let epoch_schedule = rpc_client.get_epoch_schedule()?;
let feature_set = CliFeatures {
@ -743,6 +884,7 @@ fn process_status(
epoch_schedule,
feature_activation_allowed,
cluster_feature_sets,
cluster_software_versions,
inactive,
};
Ok(config.output_format.formatted_string(&feature_set))