diff --git a/Cargo.lock b/Cargo.lock index a2c049a8ab..d830457256 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3714,9 +3714,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.59" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" +checksum = "ea1c6153794552ea7cf7cf63b1231a25de00ec90db326ba6264440fa08e31486" dependencies = [ "itoa", "ryu", @@ -5236,6 +5236,8 @@ dependencies = [ "log 0.4.11", "reqwest 0.10.8", "semver 0.9.0", + "serde", + "serde_json", "serde_yaml", "solana-clap-utils", "solana-cli-config", diff --git a/stake-o-matic/Cargo.toml b/stake-o-matic/Cargo.toml index f9da2257f6..1814c38558 100644 --- a/stake-o-matic/Cargo.toml +++ b/stake-o-matic/Cargo.toml @@ -13,6 +13,8 @@ clap = "2.33.0" log = "0.4.11" reqwest = { version = "0.10.8", default-features = false, features = ["blocking", "rustls-tls", "json"] } semver = "0.9.0" +serde = { version = "1.0.122", features = ["derive"] } +serde_json = "1.0.62" serde_yaml = "0.8.13" solana-clap-utils = { path = "../clap-utils", version = "1.6.0" } solana-client = { path = "../client", version = "1.6.0" } diff --git a/stake-o-matic/src/confirmed_block_cache.rs b/stake-o-matic/src/confirmed_block_cache.rs new file mode 100644 index 0000000000..b7dd6c6a38 --- /dev/null +++ b/stake-o-matic/src/confirmed_block_cache.rs @@ -0,0 +1,218 @@ +use crate::retry_rpc_operation; +use log::*; +use serde::{Deserialize, Serialize}; +use solana_client::rpc_client::RpcClient; +use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, epoch_info::EpochInfo}; +use std::{ + cell::RefCell, + fs::{self, File, OpenOptions}, + io, + ops::Range, + path::{Path, PathBuf}, +}; + +#[derive(Clone, Debug, Default)] +struct Entry { + slots: Range, + path: PathBuf, +} + +impl Entry { + pub fn new>(base_path: P, slots: Range) -> Self { + let file_name = format!("{}-{}.json", slots.start, slots.end); + let path = base_path.as_ref().join(file_name); + Self { slots, path } + } + + fn parse_filename>(filename: F) -> Option> { + let filename = filename.as_ref(); + let slot_range = filename.file_stem(); + let extension = filename.extension(); + extension + .zip(slot_range) + .and_then(|(extension, slot_range)| { + if extension == "json" { + slot_range.to_str() + } else { + None + } + }) + .and_then(|slot_range| { + let mut parts = slot_range.splitn(2, '-'); + let start = parts.next().and_then(|p| p.parse::().ok()); + let end = parts.next().and_then(|p| p.parse::().ok()); + start.zip(end).map(|(start, end)| start..end) + }) + } + + pub fn from_pathbuf(path: PathBuf) -> Option { + path.file_name() + .and_then(|n| n.to_str()) + .and_then(Self::parse_filename) + .map(|slots| Self { slots, path }) + } + + pub fn path(&self) -> &Path { + &self.path + } +} + +const CACHE_VERSION: u64 = 0; +const DEFAULT_SLOTS_PER_ENTRY: u64 = 2500; +const CONFIG_FILENAME: &str = "config.yaml"; + +#[derive(Debug, Deserialize, Serialize)] +struct Config { + version: u64, + slots_per_chunk: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + version: CACHE_VERSION, + slots_per_chunk: DEFAULT_SLOTS_PER_ENTRY, + } + } +} + +pub struct ConfirmedBlockCache { + rpc_client: RpcClient, + base_path: PathBuf, + entries: RefCell>, + config: Config, +} + +impl ConfirmedBlockCache { + fn store_config>(config_path: P, config: &Config) -> io::Result<()> { + let config_path = config_path.as_ref(); + let file = File::create(config_path)?; + serde_yaml::to_writer(file, config).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!( + "error: cannot store config `{}`: {:?}", + config_path.to_string_lossy(), + e, + ), + ) + }) + } + + fn load_config>(config_path: P) -> io::Result { + let config_path = config_path.as_ref(); + let file = File::open(config_path)?; + serde_yaml::from_reader(file).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!( + "error: cannot load config `{}`: {:?}", + config_path.to_string_lossy(), + e, + ), + ) + }) + } + + pub fn open, U: AsRef>(path: P, rpc_url: U) -> io::Result { + let path = path.as_ref(); + let config_path = path.join(CONFIG_FILENAME); + let rpc_url = rpc_url.as_ref(); + let (config, entries) = match fs::read_dir(path) { + Ok(dir_entries) => { + let config = Self::load_config(&config_path)?; + if config.version != CACHE_VERSION { + return Err(io::Error::new( + io::ErrorKind::Other, + "unexpected cache version", + )); + } + let mut entries = dir_entries + .filter_map(|de| Entry::from_pathbuf(de.unwrap().path())) + .collect::>(); + entries.sort_by(|l, r| l.slots.start.cmp(&r.slots.start)); + Ok((config, entries)) + } + Err(err) => { + if err.kind() == io::ErrorKind::NotFound { + let config = Config::default(); + fs::create_dir_all(path)?; + Self::store_config(config_path, &config)?; + Ok((config, Vec::new())) + } else { + Err(err) + } + } + }?; + Ok(Self { + rpc_client: RpcClient::new(rpc_url.to_string()), + base_path: path.to_path_buf(), + entries: RefCell::new(entries), + config, + }) + } + + fn lookup(&self, start: Slot) -> Option { + let entries = self.entries.borrow(); + for i in entries.iter() { + if i.slots.start == start { + debug!("HIT: {}", start); + return Some(i.clone()); + } + } + debug!("MISS: {}", start); + None + } + + fn fetch(&self, start: Slot, end: Slot, epoch_info: &EpochInfo) -> io::Result> { + debug!("fetching slot range: {}..{}", start, end); + // Fingers crossed we hit the same RPC backend... + let slots = retry_rpc_operation(42, || { + self.rpc_client.get_confirmed_blocks(start, Some(end)) + }) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?; + + // Only cache complete chunks + if end + self.config.slots_per_chunk < epoch_info.absolute_slot { + debug!("committing entry for slots {}..{}", start, end); + let entry = Entry::new(&self.base_path, start..end); + let file = OpenOptions::new() + .write(true) + .create_new(true) + .open(entry.path())?; + serde_json::to_writer(file, &slots)?; + + self.entries.borrow_mut().push(entry); + } + + Ok(slots) + } + + pub fn query(&self, start: Slot, end: Slot) -> io::Result> { + let chunk_size = self.config.slots_per_chunk; + let mut chunk_start = (start / chunk_size) * chunk_size; + let mut slots = Vec::new(); + let epoch_info = self + .rpc_client + .get_epoch_info_with_commitment(CommitmentConfig::finalized()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?; + let last_slot = end.min(epoch_info.absolute_slot); + while chunk_start < last_slot { + let mut chunk_slots = if let Some(entry) = self.lookup(chunk_start) { + let file = File::open(entry.path())?; + serde_json::from_reader(file)? + } else { + let chunk_end = chunk_start + chunk_size - 1; + self.fetch(chunk_start, chunk_end, &epoch_info)? + }; + slots.append(&mut chunk_slots); + chunk_start += chunk_size; + } + let slots = slots + .drain(..) + .skip_while(|s| *s < start) + .take_while(|s| *s <= end) + .collect::>(); + Ok(slots) + } +} diff --git a/stake-o-matic/src/main.rs b/stake-o-matic/src/main.rs index 2accf82e81..3547077896 100644 --- a/stake-o-matic/src/main.rs +++ b/stake-o-matic/src/main.rs @@ -42,8 +42,11 @@ use { }, }; +mod confirmed_block_cache; mod validator_list; +use confirmed_block_cache::ConfirmedBlockCache; + pub fn is_release_version(string: String) -> Result<(), String> { if string.starts_with('v') && semver::Version::parse(string.split_at(1).1).is_ok() { return Ok(()); @@ -110,9 +113,21 @@ struct Config { /// Don't ever unstake more than this percentage of the cluster at one time for running an /// older software version max_old_release_version_percentage: usize, + + /// Base path of confirmed block cache + confirmed_block_cache_path: PathBuf, +} + +fn default_confirmed_block_cache_path() -> PathBuf { + let home_dir = std::env::var("HOME").unwrap(); + PathBuf::from(home_dir).join(".cache/solana/som/confirmed-block-cache/") } fn get_config() -> Config { + let default_confirmed_block_cache_path = default_confirmed_block_cache_path() + .to_str() + .unwrap() + .to_string(); let matches = App::new(crate_name!()) .about(crate_description!()) .version(crate_version!()) @@ -241,6 +256,14 @@ fn get_config() -> Config { software versions if more than this percentage of \ all validators are running an older software version") ) + .arg( + Arg::with_name("confirmed_block_cache_path") + .long("confirmed-block-cache-path") + .takes_value(true) + .value_name("PATH") + .default_value(&default_confirmed_block_cache_path) + .help("Base path of confirmed block cache") + ) .get_matches(); let config = if let Some(config_file) = matches.value_of("config_file") { @@ -306,6 +329,10 @@ fn get_config() -> Config { _ => unreachable!(), }; let validator_list = validator_list.into_iter().collect::>(); + let confirmed_block_cache_path = matches + .value_of("confirmed_block_cache_path") + .map(PathBuf::from) + .unwrap(); let config = Config { json_rpc_url, @@ -323,6 +350,7 @@ fn get_config() -> Config { address_labels: config.address_labels, min_release_version, max_old_release_version_percentage, + confirmed_block_cache_path, }; info!("RPC URL: {}", config.json_rpc_url); @@ -360,7 +388,7 @@ fn get_stake_account( .map(|stake_state| (account.lamports, stake_state)) } -fn retry_rpc_operation(mut retries: usize, op: F) -> client_error::Result +pub fn retry_rpc_operation(mut retries: usize, op: F) -> client_error::Result where F: Fn() -> client_error::Result, { @@ -420,34 +448,12 @@ fn classify_block_producers( let leader_schedule = rpc_client.get_leader_schedule(Some(first_slot))?.unwrap(); - let mut confirmed_blocks = vec![]; - // Fetching a large number of blocks from BigTable can cause timeouts, break up the requests - const LONGTERM_STORAGE_STEP: u64 = 5_000; - let mut next_slot = first_slot; - while next_slot < last_slot_in_epoch { - let last_slot = if next_slot >= minimum_ledger_slot { - last_slot_in_epoch - } else { - last_slot_in_epoch.min(next_slot + LONGTERM_STORAGE_STEP) - }; - let slots_remaining = last_slot_in_epoch - last_slot; - info!( - "Fetching confirmed blocks between {} - {}{}", - next_slot, - last_slot, - if slots_remaining > 0 { - format!(" ({} remaining)", slots_remaining) - } else { - "".to_string() - } - ); - - confirmed_blocks.push(retry_rpc_operation(42, || { - rpc_client.get_confirmed_blocks(next_slot, Some(last_slot)) - })?); - next_slot = last_slot + 1; - } - let confirmed_blocks: HashSet = confirmed_blocks.into_iter().flatten().collect(); + let cache_path = config.confirmed_block_cache_path.join(&config.cluster); + let cbc = ConfirmedBlockCache::open(cache_path, &config.json_rpc_url).unwrap(); + let confirmed_blocks = cbc + .query(first_slot, last_slot_in_epoch)? + .into_iter() + .collect::>(); let mut poor_block_producers = HashSet::new(); let mut quality_block_producers = HashSet::new();