Add blockstore column to store performance sampling data (#12251)

* Add blockstore column to store performance sampling data

* introduce timer and write performance metrics to blockstore

* introduce getRecentPerformanceSamples rpc

* only run on rpc nodes enabled with transaction history

* add unit tests for get_recent_performance_samples

* remove RpcResponse from rpc call

* refactor to use Instant::now and elapsed for timer

* switch to root bank and ensure not negative subraction

* Add PerfSamples to purge/compaction

* refactor to use Instant::now and elapsed for timer

* switch to root bank and ensure not negative subraction

* remove duplicate constants

Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
Josh 2020-09-22 12:26:32 -07:00 committed by GitHub
parent afd9bfc45f
commit 65a6bfad09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 312 additions and 3 deletions

View File

@ -16,6 +16,7 @@ pub mod cluster_info_vote_listener;
pub mod commitment_service;
pub mod completed_data_sets_service;
mod deprecated;
pub mod sample_performance_service;
pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;

View File

@ -33,7 +33,10 @@ use solana_client::{
rpc_response::*,
};
use solana_faucet::faucet::request_airdrop_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path};
use solana_ledger::{
blockstore::Blockstore, blockstore_db::BlockstoreError, blockstore_meta::PerfSample,
get_tmp_ledger_path,
};
use solana_perf::packet::PACKET_DATA_SIZE;
use solana_runtime::{
accounts::AccountAddressFilter,
@ -79,6 +82,7 @@ use std::{
use tokio::runtime;
pub const MAX_REQUEST_PAYLOAD_SIZE: usize = 50 * (1 << 10); // 50kB
pub const PERFORMANCE_SAMPLES_LIMIT: usize = 720;
fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
let context = RpcResponseContext { slot: bank.slot() };
@ -1496,6 +1500,13 @@ pub trait RpcSol {
#[rpc(meta, name = "getClusterNodes")]
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>>;
#[rpc(meta, name = "getRecentPerformanceSamples")]
fn get_recent_performance_samples(
&self,
meta: Self::Metadata,
limit: Option<usize>,
) -> Result<Vec<(Slot, PerfSample)>>;
#[rpc(meta, name = "getEpochInfo")]
fn get_epoch_info(
&self,
@ -1885,6 +1896,30 @@ impl RpcSol for RpcSolImpl {
Ok(meta.get_balance(&pubkey, commitment))
}
fn get_recent_performance_samples(
&self,
meta: Self::Metadata,
limit: Option<usize>,
) -> Result<Vec<(Slot, PerfSample)>> {
debug!("get_recent_performance_samples request received");
let limit = limit.unwrap_or(PERFORMANCE_SAMPLES_LIMIT);
if limit > PERFORMANCE_SAMPLES_LIMIT {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
PERFORMANCE_SAMPLES_LIMIT
)));
}
meta.blockstore
.get_recent_perf_samples(limit)
.map_err(|err| {
warn!("get_recent_performance_samples failed: {:?}", err);
Error::invalid_request()
})
}
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> {
debug!("get_cluster_nodes rpc request received");
let cluster_info = &meta.cluster_info;
@ -2670,6 +2705,16 @@ pub mod tests {
&socketaddr!("127.0.0.1:1234"),
));
let sample1 = PerfSample {
num_slots: 1,
num_transactions: 4,
sample_period_secs: 60,
};
blockstore
.write_perf_sample(0, &sample1)
.expect("write to blockstore");
let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig {
enable_rpc_transaction_history: true,
@ -2797,6 +2842,63 @@ pub mod tests {
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_recent_performance_samples() {
let bob_pubkey = Pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req = r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples"}"#;
let res = io.handle_request_sync(&req, meta);
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let expected = json!({
"jsonrpc": "2.0",
"id": 1,
"result": [
[
0,
{
"num_slots": 1,
"num_transactions": 4,
"sample_period_secs": 60
}
]
],
});
let expected: Response =
serde_json::from_value(expected).expect("expected response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_recent_performance_samples_invalid_limit() {
let bob_pubkey = Pubkey::new_rand();
let RpcHandler { io, meta, .. } = start_rpc_handler_with_tx(&bob_pubkey);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getRecentPerformanceSamples","params":[10000]}"#;
let res = io.handle_request_sync(&req, meta);
let result: Response = serde_json::from_str(&res.expect("actual response"))
.expect("actual response deserialization");
let expected = json!({
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "Invalid limit; max 720"
},
"id": 1
});
let expected: Response =
serde_json::from_value(expected).expect("expected response deserialization");
assert_eq!(expected, result);
}
#[test]
fn test_rpc_get_slot_leader() {
let bob_pubkey = Pubkey::new_rand();

View File

@ -0,0 +1,104 @@
use solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample};
use solana_runtime::bank_forks::BankForks;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
const SAMPLE_INTERVAL: u64 = 60;
const SLEEP_INTERVAL: u64 = 500;
pub struct SamplePerformanceSnapshot {
pub num_transactions: u64,
pub num_slots: u64,
}
pub struct SamplePerformanceService {
thread_hdl: JoinHandle<()>,
}
impl SamplePerformanceService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank_forks: &Arc<RwLock<BankForks>>,
blockstore: &Arc<Blockstore>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let blockstore = blockstore.clone();
let bank_forks = bank_forks.clone();
info!("Starting SamplePerformance service");
let thread_hdl = Builder::new()
.name("sample-performance".to_string())
.spawn(move || {
Self::run(bank_forks, &blockstore, exit);
})
.unwrap();
Self { thread_hdl }
}
pub fn run(
bank_forks: Arc<RwLock<BankForks>>,
blockstore: &Arc<Blockstore>,
exit: Arc<AtomicBool>,
) {
let forks = bank_forks.read().unwrap();
let bank = forks.root_bank().clone();
let highest_slot = forks.highest_slot();
drop(forks);
let mut sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
};
let mut now = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let elapsed = now.elapsed();
if elapsed.as_secs() >= SAMPLE_INTERVAL {
now = Instant::now();
let bank_forks = bank_forks.read().unwrap();
let bank = bank_forks.root_bank().clone();
let highest_slot = bank_forks.highest_slot();
drop(bank_forks);
let perf_sample = PerfSample {
num_slots: highest_slot
.checked_sub(sample_snapshot.num_slots)
.unwrap_or_default(),
num_transactions: bank
.transaction_count()
.checked_sub(sample_snapshot.num_transactions)
.unwrap_or_default(),
sample_period_secs: elapsed.as_secs() as u16,
};
if let Err(e) = blockstore.write_perf_sample(highest_slot, &perf_sample) {
error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e);
}
sample_snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(),
num_slots: highest_slot,
};
}
sleep(Duration::from_millis(SLEEP_INTERVAL));
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -16,6 +16,7 @@ use crate::{
rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
sample_performance_service::SamplePerformanceService,
serve_repair::ServeRepair,
serve_repair_service::ServeRepairService,
sigverify,
@ -169,6 +170,7 @@ pub struct Validator {
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_time_service: Option<CacheBlockTimeService>,
sample_performance_service: Option<SamplePerformanceService>,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
@ -279,6 +281,17 @@ impl Validator {
let bank = bank_forks.working_bank();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let sample_performance_service =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
Some(SamplePerformanceService::new(
&bank_forks,
&blockstore,
&exit,
))
} else {
None
};
info!("Starting validator with working bank slot {}", bank.slot());
{
let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect();
@ -554,6 +567,7 @@ impl Validator {
transaction_status_service,
rewards_recorder_service,
cache_block_time_service,
sample_performance_service,
snapshot_packager_service,
completed_data_sets_service,
tpu,
@ -622,6 +636,10 @@ impl Validator {
cache_block_time_service.join()?;
}
if let Some(sample_performance_service) = self.sample_performance_service {
sample_performance_service.join()?;
}
if let Some(s) = self.snapshot_packager_service {
s.join()?;
}

View File

@ -137,6 +137,7 @@ pub struct Blockstore {
active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>,
blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
@ -293,6 +294,7 @@ impl Blockstore {
let transaction_status_index_cf = db.column();
let rewards_cf = db.column();
let blocktime_cf = db.column();
let perf_samples_cf = db.column();
let db = Arc::new(db);
@ -338,6 +340,7 @@ impl Blockstore {
active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf,
blocktime_cf,
perf_samples_cf,
new_shreds_signals: vec![],
completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())),
@ -2303,6 +2306,22 @@ impl Blockstore {
.collect())
}
pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
Ok(self
.db
.iter::<cf::PerfSamples>(IteratorMode::End)?
.take(num)
.map(|(slot, data)| {
let perf_sample = deserialize(&data).unwrap();
(slot, perf_sample)
})
.collect())
}
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> {
self.perf_samples_cf.put(index, perf_sample)
}
/// Returns the entry vector for the slot starting with `shred_start_index`
pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result<Vec<Entry>> {
self.get_slot_entries_with_shred_info(slot, shred_start_index, false)
@ -6930,6 +6949,38 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_write_get_perf_samples() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries: usize = 10;
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 {
perf_samples.push((
x as u64 * 50,
PerfSample {
num_transactions: 1000 + x as u64,
num_slots: 50,
sample_period_secs: 20,
},
));
}
for (slot, sample) in perf_samples.iter() {
blockstore.write_perf_sample(*slot, sample).unwrap();
}
for x in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(x + 1).unwrap(),
expected_samples
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_lowest_slot() {
let blockstore_path = get_tmp_ledger_path!();

View File

@ -137,6 +137,10 @@ impl Blockstore {
& self
.db
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot)
.is_ok();
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
@ -231,6 +235,10 @@ impl Blockstore {
&& self
.blocktime_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.perf_samples_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false);
compact_timer.stop();
if !result {

View File

@ -52,6 +52,8 @@ const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
const REWARDS_CF: &str = "rewards";
/// Column family for Blocktime
const BLOCKTIME_CF: &str = "blocktime";
/// Column family for Performance Samples
const PERF_SAMPLES_CF: &str = "perf_samples";
#[derive(Error, Debug)]
pub enum BlockstoreError {
@ -140,6 +142,10 @@ pub mod columns {
#[derive(Debug)]
/// The blocktime column
pub struct Blocktime;
#[derive(Debug)]
/// The performance samples column
pub struct PerfSamples;
}
pub enum AccessType {
@ -200,7 +206,7 @@ impl Rocks {
) -> Result<Rocks> {
use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex,
};
@ -236,6 +242,8 @@ impl Rocks {
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
@ -255,6 +263,7 @@ impl Rocks {
),
(Rewards::NAME, rewards_cf_descriptor),
(Blocktime::NAME, blocktime_cf_descriptor),
(PerfSamples::NAME, perf_samples_cf_descriptor),
];
// Open the database
@ -293,7 +302,7 @@ impl Rocks {
fn columns(&self) -> Vec<&'static str> {
use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex,
};
@ -312,6 +321,7 @@ impl Rocks {
TransactionStatusIndex::NAME,
Rewards::NAME,
Blocktime::NAME,
PerfSamples::NAME,
]
}
@ -545,6 +555,14 @@ impl TypedColumn for columns::Blocktime {
type Type = UnixTimestamp;
}
impl SlotColumn for columns::PerfSamples {}
impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF;
}
impl TypedColumn for columns::PerfSamples {
type Type = blockstore_meta::PerfSample;
}
impl Column for columns::ShredCode {
type Index = (u64, u64);

View File

@ -243,6 +243,13 @@ pub struct AddressSignatureMeta {
pub writeable: bool,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct PerfSample {
pub num_transactions: u64,
pub num_slots: u64,
pub sample_period_secs: u16,
}
#[cfg(test)]
mod test {
use super::*;