Add ramp-tps

This commit is contained in:
Michael Vines 2020-04-17 20:13:38 -07:00
parent 9da366c193
commit 124287a0ea
10 changed files with 1356 additions and 0 deletions

21
Cargo.lock generated
View File

@ -4461,6 +4461,27 @@ dependencies = [
"solana-sdk 1.2.0",
]
[[package]]
name = "solana-ramp-tps"
version = "1.2.0"
dependencies = [
"bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.4 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-client 1.2.0",
"solana-core 1.2.0",
"solana-logger 1.2.0",
"solana-metrics 1.2.0",
"solana-net-utils 1.2.0",
"solana-sdk 1.2.0",
"solana-stake-program 1.2.0",
"tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "solana-rayon-threadlimit"
version = "1.2.0"

View File

@ -48,6 +48,7 @@ members = [
"archiver-lib",
"archiver-utils",
"remote-wallet",
"ramp-tps",
"runtime",
"sdk",
"sdk-c",

26
ramp-tps/Cargo.toml Normal file
View File

@ -0,0 +1,26 @@
[package]
authors = ["Solana Maintainers <maintainers@solana.com>"]
edition = "2018"
name = "solana-ramp-tps"
description = "Solana Tour de SOL - TPS ramp up"
version = "1.2.0"
repository = "https://github.com/solana-labs/tour-de-sol"
license = "Apache-2.0"
homepage = "https://solana.com/"
[dependencies]
bzip2 = "0.3.3"
clap = "2.33.0"
log = "0.4.8"
reqwest = { version = "0.10.4", default-features = false }
serde = "1.0.106"
serde_json = "1.0.51"
serde_yaml = "0.8.11"
solana-core = { path = "../core", version = "1.2.0" }
solana-client = { path = "../client", version = "1.2.0" }
solana-logger = { path = "../logger", version = "1.2.0" }
solana-metrics = { path = "../metrics", version = "1.2.0" }
solana-net-utils = { path = "../net-utils", version = "1.2.0" }
solana-sdk = { path = "../sdk", version = "1.2.0" }
solana-stake-program = { path = "../programs/stake", version = "1.2.0" }
tar = "0.4.26"

494
ramp-tps/src/main.rs Normal file
View File

@ -0,0 +1,494 @@
//! Ramp up TPS for Tour de SOL until all validators drop out
mod notifier;
mod results;
mod stake;
mod tps;
mod utils;
mod voters;
use clap::{crate_description, crate_name, crate_version, value_t, value_t_or_exit, App, Arg};
use log::*;
use results::Results;
use solana_client::rpc_client::RpcClient;
use solana_metrics::datapoint_info;
use solana_sdk::{genesis_config::GenesisConfig, signature::read_keypair_file};
use solana_stake_program::config::{id as stake_config_id, Config as StakeConfig};
use std::{
collections::HashMap,
fs,
path::PathBuf,
process::{exit, Command},
rc::Rc,
thread::sleep,
time::Duration,
};
const NUM_BENCH_CLIENTS: usize = 2;
const TDS_ENTRYPOINT: &str = "tds.solana.com";
const TMP_LEDGER_PATH: &str = ".tmp/ledger";
const FAUCET_KEYPAIR_PATH: &str = "faucet-keypair.json";
const PUBKEY_MAP_FILE: &str = "validators/all-username.yml";
const RESULTS_FILE: &str = "results.yml";
const DEFAULT_TX_COUNT_BASELINE: &str = "5000";
const DEFAULT_TX_COUNT_INCREMENT: &str = "5000";
const DEFAULT_TPS_ROUND_MINUTES: &str = "60";
const THREAD_BATCH_SLEEP_MS: &str = "1000";
const DEFAULT_INITIAL_SOL_BALANCE: &str = "1";
// Transaction count increments linearly each round
fn tx_count_for_round(tps_round: u32, base: u64, incr: u64) -> u64 {
base + u64::from(tps_round - 1) * incr
}
// Gift will double the staked lamports each round.
fn gift_for_round(tps_round: u32, initial_balance: u64) -> u64 {
if tps_round > 1 {
initial_balance * 2u64.pow(tps_round - 2)
} else {
0
}
}
#[allow(clippy::cognitive_complexity)]
fn main() {
solana_logger::setup_with_default("solana=debug");
solana_metrics::set_panic_hook("ramp-tps");
let mut notifier = notifier::Notifier::new();
let matches = App::new(crate_name!())
.about(crate_description!())
.version(crate_version!())
.arg(
Arg::with_name("faucet_keypair_path")
.long("faucet-keypair-path")
.short("k")
.value_name("PATH")
.takes_value(true)
.default_value(FAUCET_KEYPAIR_PATH)
.help("Path to the faucet keypair for stake award distribution"),
)
.arg(
Arg::with_name("net_dir")
.long("net-dir")
.value_name("DIR")
.takes_value(true)
.help("The directory used for running commands on the cluster"),
)
.arg(
Arg::with_name("pubkey_map_file")
.long("pubkey-map-file")
.value_name("FILE")
.default_value(PUBKEY_MAP_FILE)
.takes_value(true)
.help("YAML file that maps validator identity pubkeys to keybase user id"),
)
.arg(
Arg::with_name("results_file")
.long("results-file")
.value_name("FILE")
.default_value(RESULTS_FILE)
.takes_value(true)
.help("YAML file that lists the results for each round"),
)
.arg(
Arg::with_name("round")
.long("round")
.value_name("NUM")
.takes_value(true)
.default_value("1")
.help("The starting round of TPS ramp up"),
)
.arg(
Arg::with_name("round_minutes")
.long("round-minutes")
.value_name("NUM")
.takes_value(true)
.default_value(DEFAULT_TPS_ROUND_MINUTES)
.help("The duration in minutes of a TPS round"),
)
.arg(
Arg::with_name("tx_count_baseline")
.long("tx-count-baseline")
.value_name("NUM")
.takes_value(true)
.default_value(DEFAULT_TX_COUNT_BASELINE)
.help("The tx-count of round 1"),
)
.arg(
Arg::with_name("tx_count_increment")
.long("tx-count-increment")
.value_name("NUM")
.takes_value(true)
.default_value(DEFAULT_TX_COUNT_INCREMENT)
.help("The tx-count increment for the next round"),
)
.arg(
Arg::with_name("initial_balance")
.long("initial-balance")
.value_name("SOL")
.takes_value(true)
.default_value(DEFAULT_INITIAL_SOL_BALANCE)
.help("The number of SOL that each partipant started with"),
)
.arg(
Arg::with_name("entrypoint")
.short("n")
.long("entrypoint")
.value_name("HOST")
.takes_value(true)
.default_value(TDS_ENTRYPOINT)
.validator(utils::is_host)
.help("The entrypoint used for RPC calls"),
)
.arg(
Arg::with_name("stake_activation_epoch")
.long("stake-activation-epoch")
.value_name("NUM")
.takes_value(true)
.help("The stake activated in this epoch must fully warm up before the first round begins"),
)
.arg(
Arg::with_name("destake_net_nodes_epoch")
.long("destake-net-nodes-epoch")
.value_name("NUM")
.takes_value(true)
.default_value("9")
.help("The epoch for which to run destake-net-nodes.sh at"),
)
.get_matches();
let pubkey_map_file = value_t_or_exit!(matches, "pubkey_map_file", String);
let pubkey_map: HashMap<String, String> =
serde_yaml::from_reader(fs::File::open(&pubkey_map_file).unwrap_or_else(|err| {
eprintln!(
"Error: Unable to open --pubkey-map-file {}: {}",
pubkey_map_file, err
);
exit(1);
}))
.unwrap_or_else(|err| {
eprintln!(
"Error: Unable to parse --pubkey-map-file {}: {}",
pubkey_map_file, err
);
exit(1);
});
let pubkey_to_keybase = Rc::new(move |pubkey: &solana_sdk::pubkey::Pubkey| -> String {
let pubkey = pubkey.to_string();
match pubkey_map.get(&pubkey) {
Some(keybase) => format!("{} ({})", keybase, pubkey),
None => pubkey,
}
});
let net_dir = value_t_or_exit!(matches, "net_dir", String);
let faucet_keypair_path = value_t_or_exit!(matches, "faucet_keypair_path", String);
let faucet_keypair = read_keypair_file(&faucet_keypair_path)
.unwrap_or_else(|err| panic!("Unable to read {}: {}", faucet_keypair_path, err));
let mut tps_round = value_t_or_exit!(matches, "round", u32).max(1);
let results_file_name = value_t_or_exit!(matches, "results_file", String);
let previous_results = Results::read(&results_file_name);
let mut tps_round_results = Results::new(results_file_name, previous_results, tps_round);
let tx_count_baseline = value_t_or_exit!(matches, "tx_count_baseline", u64);
let tx_count_increment = value_t_or_exit!(matches, "tx_count_increment", u64);
let round_minutes = value_t_or_exit!(matches, "round_minutes", u64).max(1);
let round_duration = Duration::from_secs(round_minutes * 60);
let initial_balance = value_t_or_exit!(matches, "initial_balance", u64);
let tmp_ledger_path = PathBuf::from(TMP_LEDGER_PATH);
let _ = fs::remove_dir_all(&tmp_ledger_path);
fs::create_dir_all(&tmp_ledger_path).expect("failed to create temp ledger path");
notifier.notify("Hi!");
datapoint_info!("ramp-tps", ("event", "boot", String),);
let entrypoint_str = matches.value_of("entrypoint").unwrap();
debug!("Connecting to {}", entrypoint_str);
let entrypoint_addr = solana_net_utils::parse_host_port(&format!("{}:8899", entrypoint_str))
.expect("failed to parse entrypoint address");
utils::download_genesis(&entrypoint_addr, &tmp_ledger_path).expect("genesis download failed");
let genesis_config =
GenesisConfig::load(&tmp_ledger_path).expect("failed to load genesis block");
debug!("Fetching current slot...");
let rpc_client = RpcClient::new_socket_with_timeout(entrypoint_addr, Duration::from_secs(10));
let current_slot = rpc_client.get_slot().expect("failed to fetch current slot");
debug!("Current slot: {}", current_slot);
let epoch_schedule = &genesis_config.epoch_schedule;
let first_normal_slot = epoch_schedule.first_normal_slot;
debug!("First normal slot: {}", first_normal_slot);
let sleep_slots = first_normal_slot.saturating_sub(current_slot);
if sleep_slots > 0 {
notifier.notify(&format!(
"Waiting for warm-up epochs to complete (epoch {})",
epoch_schedule.first_normal_epoch
));
utils::sleep_until_epoch(
&rpc_client,
&notifier,
&genesis_config,
current_slot,
epoch_schedule.first_normal_epoch,
);
}
debug!("Fetching stake config...");
let stake_config_account = rpc_client
.get_account(&stake_config_id())
.expect("failed to fetch stake config");
let stake_config = StakeConfig::from(&stake_config_account).unwrap();
// Check if destake-net-nodes.sh should be run
{
let epoch_info = rpc_client.get_epoch_info().unwrap();
let destake_net_nodes_epoch = value_t_or_exit!(matches, "destake_net_nodes_epoch", u64);
if epoch_info.epoch >= destake_net_nodes_epoch {
info!(
"Current epoch {} >= destake_net_nodes_epoch of {}, skipping destake-net-nodes.sh",
epoch_info.epoch, destake_net_nodes_epoch
);
} else {
info!(
"Waiting for destake-net-nodes epoch {}",
destake_net_nodes_epoch
);
utils::sleep_until_epoch(
&rpc_client,
&notifier,
&genesis_config,
epoch_info.absolute_slot,
destake_net_nodes_epoch,
);
info!("Destaking net nodes...");
Command::new("bash")
.args(&["destake-net-nodes.sh", &net_dir])
.spawn()
.unwrap();
info!("Done destaking net nodes");
}
}
// Wait for the next epoch, or --stake-activation-epoch
{
let epoch_info = rpc_client.get_epoch_info().unwrap();
let activation_epoch = value_t!(matches, "stake_activation_epoch", u64)
.ok()
.unwrap_or(epoch_info.epoch - 1);
debug!("Current epoch info: {:?}", &epoch_info);
debug!("Activation epoch is: {:?}", activation_epoch);
stake::wait_for_warm_up(
activation_epoch,
epoch_info,
&rpc_client,
&stake_config,
&genesis_config,
&notifier,
);
}
let mut tps_sampler = tps::Sampler::new(&entrypoint_addr);
loop {
notifier.notify(&format!("Round {}!", tps_round));
let tx_count = tx_count_for_round(tps_round, tx_count_baseline, tx_count_increment);
datapoint_info!(
"ramp-tps",
("event", "round-start", String),
("round", tps_round, i64),
("tx_count", tx_count, i64)
);
let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| {
utils::bail(
&notifier,
&format!("Error: get latest slot failed: {}", err),
);
});
sleep(Duration::from_secs(5));
let round_start_slot = rpc_client.get_slot().unwrap_or_else(|err| {
utils::bail(
&notifier,
&format!("Error: get round start slot failed: {}", err),
);
});
if round_start_slot == latest_slot {
utils::bail(
&notifier,
&format!("Slot is not advancing from {}", latest_slot),
);
}
let starting_validators = voters::fetch_active_validators(&rpc_client);
datapoint_info!(
"ramp-tps",
("event", "start-transactions", String),
("round", tps_round, i64),
("validators", starting_validators.len(), i64)
);
notifier.buffer(format!(
"There are {} validators present:",
starting_validators.len()
));
let mut validators: Vec<_> = starting_validators
.keys()
.map(|node_pubkey| format!("* {}", pubkey_to_keybase(&node_pubkey)))
.collect();
validators.sort();
notifier.buffer_vec(validators);
notifier.flush();
let client_tx_count = tx_count / NUM_BENCH_CLIENTS as u64;
notifier.notify(&format!(
"Starting transactions for {} minutes (batch size={})",
round_minutes, tx_count,
));
info!(
"Running bench-tps={}='--tx_count={} --thread-batch-sleep-ms={}'",
NUM_BENCH_CLIENTS, client_tx_count, THREAD_BATCH_SLEEP_MS
);
for client_id in 0..NUM_BENCH_CLIENTS {
Command::new("bash")
.args(&[
"wrapper-bench-tps.sh",
&net_dir,
&client_id.to_string(),
&client_tx_count.to_string(),
THREAD_BATCH_SLEEP_MS,
])
.spawn()
.unwrap();
}
let bench_warmup_secs = 60;
info!(
"Sleeping {}s to allow bench-tps to warmup",
bench_warmup_secs
);
sleep(Duration::from_secs(bench_warmup_secs));
tps_sampler.start_sampling_thread();
sleep(round_duration);
tps_sampler.stop_sampling_thread();
for client_id in 0..NUM_BENCH_CLIENTS {
Command::new("bash")
.args(&[
"wrapper-bench-tps.sh",
&net_dir,
&client_id.to_string(),
"0", // Setting txCount to 0 will kill bench-tps
THREAD_BATCH_SLEEP_MS,
])
.spawn()
.unwrap();
}
datapoint_info!(
"ramp-tps",
("event", "stop-transactions", String),
("round", tps_round, i64),
);
notifier.notify("Transactions stopped");
tps_sampler.report_results(&notifier);
let remaining_validators = voters::fetch_active_validators(&rpc_client);
let remaining_keybase = remaining_validators
.keys()
.map(|k| pubkey_to_keybase(k))
.collect();
tps_round_results
.record(tps_round, remaining_keybase)
.unwrap_or_else(|err| {
warn!("Failed to record round results: {}", err);
});
if remaining_validators.is_empty() {
utils::bail(&notifier, "No validators remain");
}
datapoint_info!(
"ramp-tps",
("event", "calculate-leader-records", String),
("round", tps_round, i64),
("validators", remaining_validators.len(), i64)
);
let round_end_slot = rpc_client.get_slot().unwrap_or_else(|err| {
utils::bail(
&notifier,
&format!("Error: get round end slot failed: {}", err),
);
});
let leader_records = voters::calculate_leader_records(
&rpc_client,
&epoch_schedule,
round_start_slot,
round_end_slot,
&notifier,
)
.unwrap_or_else(|err| {
utils::bail(
&notifier,
&format!("Error: Could not calculate leader records: {}", err),
);
});
voters::announce_results(
&starting_validators,
&remaining_validators,
pubkey_to_keybase.clone(),
&leader_records,
&mut notifier,
);
datapoint_info!(
"ramp-tps",
("event", "gifting", String),
("round", tps_round, i64)
);
let healthy_validators: Vec<_> = remaining_validators
.iter()
.filter(|(k, _)| leader_records.get(k).map(|r| r.healthy()).unwrap_or(false))
.map(|(node_pubkey, vote_account_pubkey)| {
(pubkey_to_keybase(&node_pubkey), vote_account_pubkey)
})
.collect();
let next_gift = gift_for_round(tps_round + 1, initial_balance);
voters::award_stake(
&rpc_client,
&faucet_keypair,
healthy_validators,
next_gift,
&mut notifier,
);
datapoint_info!(
"ramp-tps",
("event", "new-stake-warmup", String),
("round", tps_round, i64)
);
// Wait for stake to warm up before starting the next round
let epoch_info = rpc_client.get_epoch_info().unwrap();
debug!("Current epoch info: {:?}", &epoch_info);
let current_epoch = epoch_info.epoch;
stake::wait_for_warm_up(
current_epoch,
epoch_info,
&rpc_client,
&stake_config,
&genesis_config,
&notifier,
);
tps_round += 1;
}
}

92
ramp-tps/src/notifier.rs Normal file
View File

@ -0,0 +1,92 @@
use log::*;
use reqwest::{blocking::Client, StatusCode};
use serde_json::json;
use std::{env, thread::sleep, time::Duration};
/// For each notification
/// 1) Log an info level message
/// 2) Notify Slack channel if Slack is configured
/// 3) Notify Discord channel if Discord is configured
pub struct Notifier {
buffer: Vec<String>,
client: Client,
discord_webhook: Option<String>,
slack_webhook: Option<String>,
}
impl Notifier {
pub fn new() -> Self {
let discord_webhook = env::var("DISCORD_WEBHOOK")
.map_err(|_| {
warn!("Discord notifications disabled");
})
.ok();
let slack_webhook = env::var("SLACK_WEBHOOK")
.map_err(|_| {
warn!("Slack notifications disabled");
})
.ok();
Notifier {
buffer: Vec::new(),
client: Client::new(),
discord_webhook,
slack_webhook,
}
}
fn send(&self, msg: &str) {
if let Some(webhook) = &self.discord_webhook {
for line in msg.split('\n') {
// Discord rate limiting is aggressive, limit to 1 message a second to keep
// it from getting mad at us...
sleep(Duration::from_millis(1000));
info!("Sending {}", line);
let data = json!({ "content": line });
loop {
let response = self.client.post(webhook).json(&data).send();
if let Err(err) = response {
warn!("Failed to send Discord message: \"{}\": {:?}", line, err);
break;
} else if let Ok(response) = response {
info!("response status: {}", response.status());
if response.status() == StatusCode::TOO_MANY_REQUESTS {
warn!("rate limited!...");
warn!("response text: {:?}", response.text());
std::thread::sleep(Duration::from_secs(2));
} else {
break;
}
}
}
}
}
if let Some(webhook) = &self.slack_webhook {
let data = json!({ "text": msg });
if let Err(err) = self.client.post(webhook).json(&data).send() {
warn!("Failed to send Slack message: {:?}", err);
}
}
}
pub fn buffer(&mut self, msg: String) {
self.buffer.push(msg);
}
pub fn buffer_vec(&mut self, mut msgs: Vec<String>) {
self.buffer.append(&mut msgs);
}
pub fn flush(&mut self) {
self.notify(&self.buffer.join("\n"));
self.buffer.clear();
}
pub fn notify(&self, msg: &str) {
info!("{}", msg);
self.send(msg);
}
}

92
ramp-tps/src/results.rs Normal file
View File

@ -0,0 +1,92 @@
use log::*;
use serde::{Serialize, Serializer};
use std::{
collections::{BTreeMap, HashMap},
error::Error,
fs::File,
io::ErrorKind,
process::exit,
str::FromStr,
};
const ROUND_KEY_PREFIX: &str = "round-";
#[derive(Eq, PartialEq, Ord, PartialOrd)]
struct Round(u32);
impl Serialize for Round {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{}{}", ROUND_KEY_PREFIX, self.0))
}
}
pub struct Results {
file_path: String,
results: BTreeMap<Round, Vec<String>>,
}
impl Results {
/// Keep any result entries which occurred before the starting round.
pub fn new(
file_path: String,
mut previous_results: HashMap<String, Vec<String>>,
start_round: u32,
) -> Self {
let mut results: BTreeMap<Round, Vec<String>> = BTreeMap::new();
previous_results.drain().for_each(|(key, value)| {
if key.starts_with(ROUND_KEY_PREFIX) {
let round_str = &key[ROUND_KEY_PREFIX.len()..];
dbg!(round_str);
if let Ok(round) = u32::from_str(round_str) {
if round < start_round {
results.insert(Round(round), value);
}
}
}
});
Results { file_path, results }
}
// Reads the previous results file and if it exists, parses the contents
pub fn read(file_path: &str) -> HashMap<String, Vec<String>> {
match File::open(file_path) {
Ok(file) => serde_yaml::from_reader(&file)
.map_err(|err| {
warn!("Failed to recover previous results: {}", err);
})
.unwrap_or_default(),
Err(err) => match err.kind() {
ErrorKind::NotFound => {
// Check that we can write to this file
File::create(file_path).unwrap_or_else(|err| {
eprintln!(
"Error: Unable to create --results-file {}: {}",
file_path, err
);
exit(1);
});
HashMap::new()
}
err => {
eprintln!(
"Error: Unable to open --results-file {}: {:?}",
file_path, err
);
exit(1);
}
},
}
}
/// Record the remaining validators after each TPS round
pub fn record(&mut self, round: u32, validators: Vec<String>) -> Result<(), Box<dyn Error>> {
self.results.insert(Round(round), validators);
let file = File::create(&self.file_path)?;
serde_yaml::to_writer(&file, &self.results)?;
Ok(())
}
}

143
ramp-tps/src/stake.rs Normal file
View File

@ -0,0 +1,143 @@
use crate::{notifier, utils};
use log::*;
use solana_client::{rpc_client::RpcClient, rpc_response::RpcEpochInfo};
use solana_sdk::{
clock::Epoch,
genesis_config::GenesisConfig,
stake_history::StakeHistoryEntry,
sysvar::{
stake_history::{self, StakeHistory},
Sysvar,
},
};
use solana_stake_program::config::Config as StakeConfig;
use std::{thread::sleep, time::Duration};
fn calculate_stake_warmup(mut stake_entry: StakeHistoryEntry, stake_config: &StakeConfig) -> u64 {
let mut epochs = 0;
loop {
let percent_warming_up =
stake_entry.activating as f64 / stake_entry.effective.max(1) as f64;
let percent_cooling_down =
stake_entry.deactivating as f64 / stake_entry.effective.max(1) as f64;
debug!(
"epoch +{}: stake warming up {:.1}%, cooling down {:.1}% ",
epochs,
percent_warming_up * 100.,
percent_cooling_down * 100.
);
if (percent_warming_up < 0.05) && (percent_cooling_down < 0.05) {
break;
}
let warmup_cooldown_rate = stake_config.warmup_cooldown_rate;
let max_warmup_stake = (stake_entry.effective as f64 * warmup_cooldown_rate) as u64;
let warmup_stake = stake_entry.activating.min(max_warmup_stake);
stake_entry.effective += warmup_stake;
stake_entry.activating -= warmup_stake;
let max_cooldown_stake = (stake_entry.effective as f64 * warmup_cooldown_rate) as u64;
let cooldown_stake = stake_entry.deactivating.min(max_cooldown_stake);
stake_entry.effective -= cooldown_stake;
stake_entry.deactivating -= cooldown_stake;
debug!(
"epoch +{}: stake warming up {}, cooling down {}",
epochs, warmup_stake, cooldown_stake
);
epochs += 1;
}
info!("95% stake warmup will take {} epochs", epochs);
epochs
}
fn stake_history_entry(epoch: Epoch, rpc_client: &RpcClient) -> Option<StakeHistoryEntry> {
let stake_history_account = rpc_client.get_account(&stake_history::id()).ok()?;
let stake_history = StakeHistory::from_account(&stake_history_account)?;
stake_history.get(&epoch).cloned()
}
/// Wait until stake warms up and return the current epoch
pub fn wait_for_warm_up(
activation_epoch: Epoch,
mut epoch_info: RpcEpochInfo,
rpc_client: &RpcClient,
stake_config: &StakeConfig,
genesis_config: &GenesisConfig,
notifier: &notifier::Notifier,
) {
// Sleep until activation_epoch has finished
if epoch_info.epoch <= activation_epoch {
notifier.notify(&format!(
"Waiting until epoch {} is finished...",
activation_epoch
));
utils::sleep_until_epoch(
rpc_client,
notifier,
genesis_config,
epoch_info.absolute_slot,
activation_epoch + 1,
);
}
loop {
epoch_info = rpc_client.get_epoch_info().unwrap_or_else(|err| {
utils::bail(
notifier,
&format!("Error: get_epoch_info RPC call failed: {}", err),
);
});
let current_slot = epoch_info.absolute_slot;
info!("Current slot is {}", current_slot);
let current_epoch = epoch_info.epoch;
let latest_epoch = current_epoch - 1;
debug!(
"Fetching stake history entry for epoch: {}...",
latest_epoch
);
if let Some(stake_entry) = stake_history_entry(latest_epoch, &rpc_client) {
debug!("Stake history entry: {:?}", &stake_entry);
let warm_up_epochs = calculate_stake_warmup(stake_entry, stake_config);
let stake_warmed_up_epoch = latest_epoch + warm_up_epochs;
if stake_warmed_up_epoch > current_epoch {
notifier.notify(&format!(
"Waiting until epoch {} for stake to warmup (current epoch is {})...",
stake_warmed_up_epoch, current_epoch
));
utils::sleep_until_epoch(
rpc_client,
notifier,
genesis_config,
current_slot,
stake_warmed_up_epoch,
);
} else {
break;
}
} else {
warn!(
"Failed to fetch stake history entry for epoch: {}",
latest_epoch
);
sleep(Duration::from_secs(5));
}
let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| {
utils::bail(
notifier,
&format!("Error: get_slot RPC call 3 failed: {}", err),
);
});
if current_slot == latest_slot {
utils::bail(
notifier,
&format!("Error: Slot did not advance from {}", current_slot),
);
}
}
}

72
ramp-tps/src/tps.rs Normal file
View File

@ -0,0 +1,72 @@
use crate::notifier::Notifier;
use log::*;
use solana_client::perf_utils::{sample_txs, SampleStats};
use solana_client::thin_client::ThinClient;
use solana_sdk::timing::duration_as_s;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{Builder, JoinHandle},
};
pub struct Sampler {
client: Arc<ThinClient>,
exit_signal: Arc<AtomicBool>,
maxes: Arc<RwLock<Vec<(String, SampleStats)>>>,
handle: Option<JoinHandle<()>>,
}
impl Sampler {
pub fn new(rpc_addr: &SocketAddr) -> Self {
let (_, dummy_socket) =
solana_net_utils::bind_in_range(rpc_addr.ip(), (8000, 10_000)).unwrap();
let dummy_tpu_addr = *rpc_addr;
let client = Arc::new(ThinClient::new(*rpc_addr, dummy_tpu_addr, dummy_socket));
Self {
client,
exit_signal: Arc::new(AtomicBool::new(false)),
maxes: Arc::new(RwLock::new(Vec::new())),
handle: None,
}
}
// Setup a thread to sample every period and
// collect the max transaction rate and total tx count seen
pub fn start_sampling_thread(&mut self) {
// Reset
self.exit_signal.store(false, Ordering::Relaxed);
self.maxes.write().unwrap().clear();
let sample_period = 5; // in seconds
info!("Sampling TPS every {} seconds...", sample_period);
let exit_signal = self.exit_signal.clone();
let maxes = self.maxes.clone();
let client = self.client.clone();
let handle = Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_txs(&exit_signal, &maxes, sample_period, &client);
})
.unwrap();
self.handle = Some(handle);
}
pub fn stop_sampling_thread(&mut self) {
self.exit_signal.store(true, Ordering::Relaxed);
self.handle.take().unwrap().join().unwrap();
}
pub fn report_results(&self, notifier: &Notifier) {
let SampleStats { tps, elapsed, txs } = self.maxes.read().unwrap()[0].1;
let avg_tps = txs as f32 / duration_as_s(&elapsed);
notifier.notify(&format!(
"Highest TPS: {:.0}, Average TPS: {:.0}",
tps, avg_tps
));
}
}

168
ramp-tps/src/utils.rs Normal file
View File

@ -0,0 +1,168 @@
use crate::notifier::Notifier;
use bzip2::bufread::BzDecoder;
use log::*;
use solana_client::rpc_client::RpcClient;
use solana_net_utils::parse_host;
use solana_sdk::{
clock::{Epoch, Slot},
genesis_config::GenesisConfig,
timing::duration_as_ms,
};
use std::{
fs::File,
io,
net::SocketAddr,
path::Path,
thread::sleep,
time::{Duration, Instant},
};
use tar::Archive;
const GENESIS_ARCHIVE_NAME: &str = "genesis.tar.bz2";
/// Inspired by solana_local_cluster::cluster_tests
fn slots_to_secs(num_slots: u64, genesis_config: &GenesisConfig) -> u64 {
let poh_config = &genesis_config.poh_config;
let ticks_per_slot = genesis_config.ticks_per_slot;
let num_ticks_to_sleep = num_slots as f64 * ticks_per_slot as f64;
let num_ticks_per_second = (1000 / duration_as_ms(&poh_config.target_tick_duration)) as f64;
((num_ticks_to_sleep + num_ticks_per_second - 1.0) / num_ticks_per_second) as u64
}
fn sleep_n_slots(num_slots: u64, genesis_config: &GenesisConfig) {
let secs = slots_to_secs(num_slots, genesis_config);
let mins = secs / 60;
let hours = mins / 60;
if hours >= 5 {
debug!("Sleeping for {} slots ({} hours)", num_slots, hours);
} else if mins >= 5 {
debug!("Sleeping for {} slots ({} minutes)", num_slots, mins);
} else if secs > 0 {
debug!("Sleeping for {} slots ({} seconds)", num_slots, secs);
}
sleep(Duration::from_secs(secs));
}
/// Sleep until the target epoch has started or bail if cluster is stuck
pub fn sleep_until_epoch(
rpc_client: &RpcClient,
notifier: &Notifier,
genesis_config: &GenesisConfig,
mut current_slot: Slot,
target_epoch: Epoch,
) {
let target_slot = genesis_config
.epoch_schedule
.get_first_slot_in_epoch(target_epoch);
info!(
"sleep_until_epoch() target_epoch: {}, target_slot: {}",
target_epoch, target_slot
);
loop {
let sleep_slots = target_slot.saturating_sub(current_slot);
if sleep_slots == 0 {
break;
}
sleep_n_slots(sleep_slots.max(50), genesis_config);
let latest_slot = rpc_client.get_slot().unwrap_or_else(|err| {
bail(
notifier,
&format!("Error: Could not fetch current slot: {}", err),
);
});
if current_slot == latest_slot {
bail(
notifier,
&format!("Error: Slot did not advance from {}", current_slot),
);
} else {
current_slot = latest_slot;
}
}
}
pub fn is_host(string: String) -> Result<(), String> {
parse_host(&string)?;
Ok(())
}
pub fn bail(notifier: &crate::notifier::Notifier, msg: &str) -> ! {
notifier.notify(msg);
sleep(Duration::from_secs(30)); // Wait for notifications to send
std::process::exit(1);
}
/// Inspired by solana_validator::download_tar_bz2
pub fn download_genesis(rpc_addr: &SocketAddr, download_path: &Path) -> Result<(), String> {
let archive_name = GENESIS_ARCHIVE_NAME;
let archive_path = download_path.join(archive_name);
let url = format!("http://{}/{}", rpc_addr, archive_name);
let download_start = Instant::now();
debug!("Downloading genesis ({})...", url);
let client = reqwest::blocking::Client::new();
let mut response = client
.get(url.as_str())
.send()
.and_then(|response| response.error_for_status())
.map_err(|err| format!("Unable to get: {:?}", err))?;
let download_size = {
response
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.and_then(|content_length| content_length.to_str().ok())
.and_then(|content_length| content_length.parse().ok())
.unwrap_or(0)
};
let mut file = File::create(&archive_path)
.map_err(|err| format!("Unable to create {:?}: {:?}", archive_path, err))?;
io::copy(&mut response, &mut file)
.map_err(|err| format!("Unable to write {:?}: {:?}", archive_path, err))?;
debug!(
"Downloaded genesis ({} bytes) in {:?}",
download_size,
Instant::now().duration_since(download_start),
);
debug!("Extracting genesis ({})...", archive_name);
let extract_start = Instant::now();
let tar_bz2 = File::open(&archive_path)
.map_err(|err| format!("Unable to open {}: {:?}", archive_name, err))?;
let tar = BzDecoder::new(io::BufReader::new(tar_bz2));
let mut archive = Archive::new(tar);
archive
.unpack(download_path)
.map_err(|err| format!("Unable to unpack {}: {:?}", archive_name, err))?;
debug!(
"Extracted {} in {:?}",
archive_name,
Instant::now().duration_since(extract_start)
);
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_slots_to_secs() {
let mut genesis_config = GenesisConfig::default();
genesis_config.poh_config.target_tick_duration = Duration::from_millis(500);
genesis_config.ticks_per_slot = 10;
assert_eq!(slots_to_secs(2, &genesis_config), 10);
genesis_config.ticks_per_slot = 1;
assert_eq!(slots_to_secs(1, &genesis_config), 1);
genesis_config.ticks_per_slot = 0;
assert_eq!(slots_to_secs(10, &genesis_config), 0);
}
}

247
ramp-tps/src/voters.rs Normal file
View File

@ -0,0 +1,247 @@
use crate::notifier::Notifier;
use crate::utils;
use log::*;
use solana_client::{client_error::Result as ClientResult, rpc_client::RpcClient};
use solana_sdk::{
clock::Slot,
epoch_schedule::EpochSchedule,
native_token::sol_to_lamports,
pubkey::Pubkey,
signature::{Keypair, Signer},
transaction::Transaction,
};
use solana_stake_program::{
stake_instruction,
stake_state::{Authorized as StakeAuthorized, Lockup},
};
use std::{
collections::{HashMap, HashSet},
rc::Rc,
str::FromStr,
thread::sleep,
time::Duration,
};
// The percentage of leader slots that validators complete in order to receive the stake
// reward at the end of a TPS round.
const MIN_LEADER_SLOT_PCT: f64 = 80.0;
#[derive(Default)]
pub struct LeaderRecord {
total_slots: u64,
missed_slots: u64,
}
impl LeaderRecord {
pub fn completed_slot_pct(&self) -> f64 {
if self.total_slots == 0 {
0f64
} else {
let completed_slots = self.total_slots - self.missed_slots;
100f64 * completed_slots as f64 / self.total_slots as f64
}
}
pub fn healthy(&self) -> bool {
self.completed_slot_pct() >= MIN_LEADER_SLOT_PCT
}
}
/// Calculate the leader record for each active validator
pub fn calculate_leader_records(
rpc_client: &RpcClient,
epoch_schedule: &EpochSchedule,
start_slot: Slot,
end_slot: Slot,
notifier: &Notifier,
) -> ClientResult<HashMap<Pubkey, LeaderRecord>> {
let start_epoch = epoch_schedule.get_epoch(start_slot);
let end_epoch = epoch_schedule.get_epoch(end_slot);
let confirmed_blocks: HashSet<_> = rpc_client
.get_confirmed_blocks(start_slot, Some(end_slot))?
.into_iter()
.collect();
let mut leader_records = HashMap::<Pubkey, LeaderRecord>::new();
for epoch in start_epoch..=end_epoch {
let first_slot_in_epoch = epoch_schedule.get_first_slot_in_epoch(epoch);
let start_slot = std::cmp::max(start_slot, first_slot_in_epoch);
let last_slot_in_epoch = epoch_schedule.get_last_slot_in_epoch(epoch);
let end_slot = std::cmp::min(end_slot, last_slot_in_epoch);
rpc_client
.get_leader_schedule(Some(start_slot))?
.unwrap_or_else(|| utils::bail(notifier, "Error: Leader schedule was not found"))
.into_iter()
.map(|(pk, s)| (Pubkey::from_str(&pk).unwrap(), s))
.for_each(|(pubkey, leader_slots)| {
let mut record = leader_records.entry(pubkey).or_default();
for slot_index in leader_slots.iter() {
let slot = (*slot_index as u64) + first_slot_in_epoch;
if slot >= start_slot && slot <= end_slot {
record.total_slots += 1;
if !confirmed_blocks.contains(&slot) {
record.missed_slots += 1;
}
}
}
});
}
Ok(leader_records)
}
pub fn fetch_active_validators(rpc_client: &RpcClient) -> HashMap<Pubkey, Pubkey> {
match rpc_client.get_vote_accounts() {
Err(err) => {
warn!("Failed to get_vote_accounts(): {}", err);
HashMap::new()
}
Ok(vote_accounts) => vote_accounts
.current
.into_iter()
.filter_map(|info| {
if let (Ok(node_pubkey), Ok(vote_pubkey)) = (
Pubkey::from_str(&info.node_pubkey),
Pubkey::from_str(&info.vote_pubkey),
) {
Some((node_pubkey, vote_pubkey))
} else {
None
}
})
.collect(),
}
}
/// Endlessly retry stake delegation until success
fn delegate_stake(
rpc_client: &RpcClient,
faucet_keypair: &Keypair,
vote_account_pubkey: &Pubkey,
sol_gift: u64,
) {
let stake_account_keypair = Keypair::new();
info!(
"delegate_stake: stake pubkey: {}",
stake_account_keypair.pubkey()
);
let mut retry_count = 0;
loop {
let recent_blockhash = loop {
match rpc_client.get_recent_blockhash() {
Ok(response) => break response.0,
Err(err) => {
error!("Failed to get recent blockhash: {}", err);
sleep(Duration::from_secs(5));
}
}
};
let mut transaction = Transaction::new_signed_instructions(
&[faucet_keypair, &stake_account_keypair],
stake_instruction::create_account_and_delegate_stake(
&faucet_keypair.pubkey(),
&stake_account_keypair.pubkey(),
&vote_account_pubkey,
&StakeAuthorized::auto(&faucet_keypair.pubkey()),
&Lockup::default(),
sol_to_lamports(sol_gift as f64),
),
recent_blockhash,
);
// Check if stake was delegated but just failed to confirm on an earlier attempt
if retry_count > 0 {
if let Ok(stake_account) = rpc_client.get_account(&stake_account_keypair.pubkey()) {
if stake_account.owner == solana_stake_program::id() {
break;
}
}
}
if let Err(err) = rpc_client.send_and_confirm_transaction(
&mut transaction,
&[faucet_keypair, &stake_account_keypair],
) {
error!(
"Failed to delegate stake (retries: {}): {}",
retry_count, err
);
retry_count += 1;
sleep(Duration::from_secs(5));
} else {
break;
}
}
}
/// Announce validator status leader slot performance
pub fn announce_results(
starting_validators: &HashMap<Pubkey, Pubkey>,
remaining_validators: &HashMap<Pubkey, Pubkey>,
pubkey_to_keybase: Rc<dyn Fn(&Pubkey) -> String>,
leader_records: &HashMap<Pubkey, LeaderRecord>,
notifier: &mut Notifier,
) {
let buffer_records = |keys: Vec<&Pubkey>, notifier: &mut Notifier| {
if keys.is_empty() {
notifier.buffer("* None".to_string());
return;
}
let mut validators = vec![];
for pubkey in keys {
let name = pubkey_to_keybase(pubkey);
if let Some(record) = leader_records.get(pubkey) {
validators.push(format!(
"* {} ({:.1}% leader efficiency)",
name,
record.completed_slot_pct()
));
}
}
validators.sort();
notifier.buffer_vec(validators);
};
let healthy: Vec<_> = remaining_validators
.keys()
.filter(|k| leader_records.get(k).map(|r| r.healthy()).unwrap_or(false))
.collect();
let unhealthy: Vec<_> = remaining_validators
.keys()
.filter(|k| leader_records.get(k).map(|r| !r.healthy()).unwrap_or(true))
.collect();
let inactive: Vec<_> = starting_validators
.keys()
.filter(|k| !remaining_validators.contains_key(k))
.collect();
notifier.buffer("Healthy Validators:".to_string());
buffer_records(healthy, notifier);
notifier.buffer("Unhealthy Validators:".to_string());
buffer_records(unhealthy, notifier);
notifier.buffer("Inactive Validators:".to_string());
buffer_records(inactive, notifier);
notifier.flush();
}
/// Award stake to the surviving validators by delegating stake to their vote account
pub fn award_stake(
rpc_client: &RpcClient,
faucet_keypair: &Keypair,
voters: Vec<(String, &Pubkey)>,
sol_gift: u64,
notifier: &mut Notifier,
) {
for (node_pubkey, vote_account_pubkey) in voters {
info!("Delegate {} SOL to {}", sol_gift, node_pubkey);
delegate_stake(rpc_client, faucet_keypair, vote_account_pubkey, sol_gift);
notifier.buffer(format!("Delegated {} SOL to {}", sol_gift, node_pubkey));
}
notifier.flush();
}