SoM: Cache confirmed blocks queries

This commit is contained in:
Trent Nelson 2021-02-17 01:58:45 -07:00 committed by Trent Nelson
parent 513ec31d1e
commit 985ce29dc6
4 changed files with 259 additions and 31 deletions

6
Cargo.lock generated
View File

@ -3714,9 +3714,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.59"
version = "1.0.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95"
checksum = "ea1c6153794552ea7cf7cf63b1231a25de00ec90db326ba6264440fa08e31486"
dependencies = [
"itoa",
"ryu",
@ -5236,6 +5236,8 @@ dependencies = [
"log 0.4.11",
"reqwest 0.10.8",
"semver 0.9.0",
"serde",
"serde_json",
"serde_yaml",
"solana-clap-utils",
"solana-cli-config",

View File

@ -13,6 +13,8 @@ clap = "2.33.0"
log = "0.4.11"
reqwest = { version = "0.10.8", default-features = false, features = ["blocking", "rustls-tls", "json"] }
semver = "0.9.0"
serde = { version = "1.0.122", features = ["derive"] }
serde_json = "1.0.62"
serde_yaml = "0.8.13"
solana-clap-utils = { path = "../clap-utils", version = "1.6.0" }
solana-client = { path = "../client", version = "1.6.0" }

View File

@ -0,0 +1,218 @@
use crate::retry_rpc_operation;
use log::*;
use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient;
use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig, epoch_info::EpochInfo};
use std::{
cell::RefCell,
fs::{self, File, OpenOptions},
io,
ops::Range,
path::{Path, PathBuf},
};
#[derive(Clone, Debug, Default)]
struct Entry {
slots: Range<Slot>,
path: PathBuf,
}
impl Entry {
pub fn new<P: AsRef<Path>>(base_path: P, slots: Range<Slot>) -> Self {
let file_name = format!("{}-{}.json", slots.start, slots.end);
let path = base_path.as_ref().join(file_name);
Self { slots, path }
}
fn parse_filename<F: AsRef<Path>>(filename: F) -> Option<Range<Slot>> {
let filename = filename.as_ref();
let slot_range = filename.file_stem();
let extension = filename.extension();
extension
.zip(slot_range)
.and_then(|(extension, slot_range)| {
if extension == "json" {
slot_range.to_str()
} else {
None
}
})
.and_then(|slot_range| {
let mut parts = slot_range.splitn(2, '-');
let start = parts.next().and_then(|p| p.parse::<Slot>().ok());
let end = parts.next().and_then(|p| p.parse::<Slot>().ok());
start.zip(end).map(|(start, end)| start..end)
})
}
pub fn from_pathbuf(path: PathBuf) -> Option<Self> {
path.file_name()
.and_then(|n| n.to_str())
.and_then(Self::parse_filename)
.map(|slots| Self { slots, path })
}
pub fn path(&self) -> &Path {
&self.path
}
}
const CACHE_VERSION: u64 = 0;
const DEFAULT_SLOTS_PER_ENTRY: u64 = 2500;
const CONFIG_FILENAME: &str = "config.yaml";
#[derive(Debug, Deserialize, Serialize)]
struct Config {
version: u64,
slots_per_chunk: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
version: CACHE_VERSION,
slots_per_chunk: DEFAULT_SLOTS_PER_ENTRY,
}
}
}
pub struct ConfirmedBlockCache {
rpc_client: RpcClient,
base_path: PathBuf,
entries: RefCell<Vec<Entry>>,
config: Config,
}
impl ConfirmedBlockCache {
fn store_config<P: AsRef<Path>>(config_path: P, config: &Config) -> io::Result<()> {
let config_path = config_path.as_ref();
let file = File::create(config_path)?;
serde_yaml::to_writer(file, config).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"error: cannot store config `{}`: {:?}",
config_path.to_string_lossy(),
e,
),
)
})
}
fn load_config<P: AsRef<Path>>(config_path: P) -> io::Result<Config> {
let config_path = config_path.as_ref();
let file = File::open(config_path)?;
serde_yaml::from_reader(file).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"error: cannot load config `{}`: {:?}",
config_path.to_string_lossy(),
e,
),
)
})
}
pub fn open<P: AsRef<Path>, U: AsRef<str>>(path: P, rpc_url: U) -> io::Result<Self> {
let path = path.as_ref();
let config_path = path.join(CONFIG_FILENAME);
let rpc_url = rpc_url.as_ref();
let (config, entries) = match fs::read_dir(path) {
Ok(dir_entries) => {
let config = Self::load_config(&config_path)?;
if config.version != CACHE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
"unexpected cache version",
));
}
let mut entries = dir_entries
.filter_map(|de| Entry::from_pathbuf(de.unwrap().path()))
.collect::<Vec<_>>();
entries.sort_by(|l, r| l.slots.start.cmp(&r.slots.start));
Ok((config, entries))
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
let config = Config::default();
fs::create_dir_all(path)?;
Self::store_config(config_path, &config)?;
Ok((config, Vec::new()))
} else {
Err(err)
}
}
}?;
Ok(Self {
rpc_client: RpcClient::new(rpc_url.to_string()),
base_path: path.to_path_buf(),
entries: RefCell::new(entries),
config,
})
}
fn lookup(&self, start: Slot) -> Option<Entry> {
let entries = self.entries.borrow();
for i in entries.iter() {
if i.slots.start == start {
debug!("HIT: {}", start);
return Some(i.clone());
}
}
debug!("MISS: {}", start);
None
}
fn fetch(&self, start: Slot, end: Slot, epoch_info: &EpochInfo) -> io::Result<Vec<Slot>> {
debug!("fetching slot range: {}..{}", start, end);
// Fingers crossed we hit the same RPC backend...
let slots = retry_rpc_operation(42, || {
self.rpc_client.get_confirmed_blocks(start, Some(end))
})
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
// Only cache complete chunks
if end + self.config.slots_per_chunk < epoch_info.absolute_slot {
debug!("committing entry for slots {}..{}", start, end);
let entry = Entry::new(&self.base_path, start..end);
let file = OpenOptions::new()
.write(true)
.create_new(true)
.open(entry.path())?;
serde_json::to_writer(file, &slots)?;
self.entries.borrow_mut().push(entry);
}
Ok(slots)
}
pub fn query(&self, start: Slot, end: Slot) -> io::Result<Vec<Slot>> {
let chunk_size = self.config.slots_per_chunk;
let mut chunk_start = (start / chunk_size) * chunk_size;
let mut slots = Vec::new();
let epoch_info = self
.rpc_client
.get_epoch_info_with_commitment(CommitmentConfig::finalized())
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{:?}", e)))?;
let last_slot = end.min(epoch_info.absolute_slot);
while chunk_start < last_slot {
let mut chunk_slots = if let Some(entry) = self.lookup(chunk_start) {
let file = File::open(entry.path())?;
serde_json::from_reader(file)?
} else {
let chunk_end = chunk_start + chunk_size - 1;
self.fetch(chunk_start, chunk_end, &epoch_info)?
};
slots.append(&mut chunk_slots);
chunk_start += chunk_size;
}
let slots = slots
.drain(..)
.skip_while(|s| *s < start)
.take_while(|s| *s <= end)
.collect::<Vec<_>>();
Ok(slots)
}
}

View File

@ -42,8 +42,11 @@ use {
},
};
mod confirmed_block_cache;
mod validator_list;
use confirmed_block_cache::ConfirmedBlockCache;
pub fn is_release_version(string: String) -> Result<(), String> {
if string.starts_with('v') && semver::Version::parse(string.split_at(1).1).is_ok() {
return Ok(());
@ -110,9 +113,21 @@ struct Config {
/// Don't ever unstake more than this percentage of the cluster at one time for running an
/// older software version
max_old_release_version_percentage: usize,
/// Base path of confirmed block cache
confirmed_block_cache_path: PathBuf,
}
fn default_confirmed_block_cache_path() -> PathBuf {
let home_dir = std::env::var("HOME").unwrap();
PathBuf::from(home_dir).join(".cache/solana/som/confirmed-block-cache/")
}
fn get_config() -> Config {
let default_confirmed_block_cache_path = default_confirmed_block_cache_path()
.to_str()
.unwrap()
.to_string();
let matches = App::new(crate_name!())
.about(crate_description!())
.version(crate_version!())
@ -241,6 +256,14 @@ fn get_config() -> Config {
software versions if more than this percentage of \
all validators are running an older software version")
)
.arg(
Arg::with_name("confirmed_block_cache_path")
.long("confirmed-block-cache-path")
.takes_value(true)
.value_name("PATH")
.default_value(&default_confirmed_block_cache_path)
.help("Base path of confirmed block cache")
)
.get_matches();
let config = if let Some(config_file) = matches.value_of("config_file") {
@ -306,6 +329,10 @@ fn get_config() -> Config {
_ => unreachable!(),
};
let validator_list = validator_list.into_iter().collect::<HashSet<_>>();
let confirmed_block_cache_path = matches
.value_of("confirmed_block_cache_path")
.map(PathBuf::from)
.unwrap();
let config = Config {
json_rpc_url,
@ -323,6 +350,7 @@ fn get_config() -> Config {
address_labels: config.address_labels,
min_release_version,
max_old_release_version_percentage,
confirmed_block_cache_path,
};
info!("RPC URL: {}", config.json_rpc_url);
@ -360,7 +388,7 @@ fn get_stake_account(
.map(|stake_state| (account.lamports, stake_state))
}
fn retry_rpc_operation<T, F>(mut retries: usize, op: F) -> client_error::Result<T>
pub fn retry_rpc_operation<T, F>(mut retries: usize, op: F) -> client_error::Result<T>
where
F: Fn() -> client_error::Result<T>,
{
@ -420,34 +448,12 @@ fn classify_block_producers(
let leader_schedule = rpc_client.get_leader_schedule(Some(first_slot))?.unwrap();
let mut confirmed_blocks = vec![];
// Fetching a large number of blocks from BigTable can cause timeouts, break up the requests
const LONGTERM_STORAGE_STEP: u64 = 5_000;
let mut next_slot = first_slot;
while next_slot < last_slot_in_epoch {
let last_slot = if next_slot >= minimum_ledger_slot {
last_slot_in_epoch
} else {
last_slot_in_epoch.min(next_slot + LONGTERM_STORAGE_STEP)
};
let slots_remaining = last_slot_in_epoch - last_slot;
info!(
"Fetching confirmed blocks between {} - {}{}",
next_slot,
last_slot,
if slots_remaining > 0 {
format!(" ({} remaining)", slots_remaining)
} else {
"".to_string()
}
);
confirmed_blocks.push(retry_rpc_operation(42, || {
rpc_client.get_confirmed_blocks(next_slot, Some(last_slot))
})?);
next_slot = last_slot + 1;
}
let confirmed_blocks: HashSet<Slot> = confirmed_blocks.into_iter().flatten().collect();
let cache_path = config.confirmed_block_cache_path.join(&config.cluster);
let cbc = ConfirmedBlockCache::open(cache_path, &config.json_rpc_url).unwrap();
let confirmed_blocks = cbc
.query(first_slot, last_slot_in_epoch)?
.into_iter()
.collect::<HashSet<_>>();
let mut poor_block_producers = HashSet::new();
let mut quality_block_producers = HashSet::new();