ledger/blockstore: PerfSampleV2: num_non_vote_transactions (#29404)

Store non-vote transaction counts that are now recorded by the banks
into the `blockstore`.

`SamplePerformanceService` now populates `PerfSampleV2` with counts from
the banks.
This commit is contained in:
Illia Bobyr 2023-01-12 19:14:04 -08:00 committed by GitHub
parent d32256d2af
commit 59fde130d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 279 additions and 57 deletions

View File

@ -1,5 +1,5 @@
use { use {
solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample}, solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSampleV2},
solana_runtime::bank_forks::BankForks, solana_runtime::bank_forks::BankForks,
std::{ std::{
sync::{ sync::{
@ -16,7 +16,8 @@ const SLEEP_INTERVAL: u64 = 500;
pub struct SamplePerformanceSnapshot { pub struct SamplePerformanceSnapshot {
pub num_transactions: u64, pub num_transactions: u64,
pub num_slots: u64, pub num_non_vote_transactions: u64,
pub highest_slot: u64,
} }
pub struct SamplePerformanceService { pub struct SamplePerformanceService {
@ -50,14 +51,15 @@ impl SamplePerformanceService {
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) { ) {
let forks = bank_forks.read().unwrap(); let (bank, highest_slot) = {
let bank = forks.root_bank(); let forks = bank_forks.read().unwrap();
let highest_slot = forks.highest_slot(); (forks.root_bank(), forks.highest_slot())
drop(forks); };
let mut sample_snapshot = SamplePerformanceSnapshot { let mut snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(), num_transactions: bank.transaction_count(),
num_slots: highest_slot, num_non_vote_transactions: bank.non_vote_transaction_count_since_restart(),
highest_slot,
}; };
let mut now = Instant::now(); let mut now = Instant::now();
@ -70,19 +72,23 @@ impl SamplePerformanceService {
if elapsed.as_secs() >= SAMPLE_INTERVAL { if elapsed.as_secs() >= SAMPLE_INTERVAL {
now = Instant::now(); now = Instant::now();
let bank_forks = bank_forks.read().unwrap(); let (bank, highest_slot) = {
let bank = bank_forks.root_bank().clone(); let bank_forks = bank_forks.read().unwrap();
let highest_slot = bank_forks.highest_slot(); (bank_forks.root_bank(), bank_forks.highest_slot())
drop(bank_forks); };
let perf_sample = PerfSample { let num_slots = highest_slot.saturating_sub(snapshot.highest_slot);
num_slots: highest_slot let num_transactions = bank
.checked_sub(sample_snapshot.num_slots) .transaction_count()
.unwrap_or_default(), .saturating_sub(snapshot.num_transactions);
num_transactions: bank let num_non_vote_transactions = bank
.transaction_count() .non_vote_transaction_count_since_restart()
.checked_sub(sample_snapshot.num_transactions) .saturating_sub(snapshot.num_non_vote_transactions);
.unwrap_or_default(),
let perf_sample = PerfSampleV2 {
num_slots,
num_transactions,
num_non_vote_transactions,
sample_period_secs: elapsed.as_secs() as u16, sample_period_secs: elapsed.as_secs() as u16,
}; };
@ -90,9 +96,10 @@ impl SamplePerformanceService {
error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e); error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e);
} }
sample_snapshot = SamplePerformanceSnapshot { snapshot = SamplePerformanceSnapshot {
num_transactions: bank.transaction_count(), num_transactions,
num_slots: highest_slot, num_non_vote_transactions,
highest_slot,
}; };
} }

View File

@ -23,7 +23,7 @@ use {
slot_stats::{ShredSource, SlotsStats}, slot_stats::{ShredSource, SlotsStats},
}, },
assert_matches::debug_assert_matches, assert_matches::debug_assert_matches,
bincode::deserialize, bincode::{deserialize, serialize},
crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, crossbeam_channel::{bounded, Receiver, Sender, TrySendError},
dashmap::DashSet, dashmap::DashSet,
log::*, log::*,
@ -2712,19 +2712,40 @@ impl Blockstore {
} }
pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> { pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
Ok(self // When reading `PerfSamples`, the database may contain samples with either `PerfSampleV1`
// or `PerfSampleV2` encoding. We expect `PerfSampleV1` to be a prefix of the
// `PerfSampleV2` encoding (see [`perf_sample_v1_is_prefix_of_perf_sample_v2`]), so we try
// them in order.
let samples = self
.db .db
.iter::<cf::PerfSamples>(IteratorMode::End)? .iter::<cf::PerfSamples>(IteratorMode::End)?
.take(num) .take(num)
.map(|(slot, data)| { .map(|(slot, data)| {
let perf_sample = deserialize(&data).unwrap(); deserialize::<PerfSampleV2>(&data)
(slot, perf_sample) .map(|sample| (slot, sample.into()))
}) .or_else(|err| {
.collect()) match &*err {
bincode::ErrorKind::Io(io_err)
if matches!(io_err.kind(), ErrorKind::UnexpectedEof) =>
{
// Not enough bytes to deserialize as `PerfSampleV2`.
}
_ => return Err(err),
}
deserialize::<PerfSampleV1>(&data).map(|sample| (slot, sample.into()))
})
.map_err(Into::into)
});
samples.collect()
} }
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> { pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSampleV2) -> Result<()> {
self.perf_samples_cf.put(index, perf_sample) // Always write as the current version.
let bytes =
serialize(&perf_sample).expect("`PerfSampleV2` can be serialized with `bincode`");
self.perf_samples_cf.put_bytes(index, &bytes)
} }
pub fn read_program_costs(&self) -> Result<Vec<(Pubkey, u64)>> { pub fn read_program_costs(&self) -> Result<Vec<(Pubkey, u64)>> {
@ -8503,25 +8524,139 @@ pub mod tests {
} }
#[test] #[test]
fn test_write_get_perf_samples() { fn test_get_recent_perf_samples_v1_only() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let num_entries: usize = 10;
let slot_sample = |i: u64| PerfSampleV1 {
num_transactions: 1406 + i,
num_slots: 34 + i / 2,
sample_period_secs: (40 + i / 5) as u16,
};
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for i in 0..num_entries {
let slot = (i + 1) as u64 * 50;
let sample = slot_sample(i as u64);
let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
}
for i in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(i + 1).unwrap(),
expected_samples
);
}
}
#[test]
fn test_get_recent_perf_samples_v2_only() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let num_entries: usize = 10;
let slot_sample = |i: u64| PerfSampleV2 {
num_transactions: 2495 + i,
num_slots: 167 + i / 2,
sample_period_secs: (37 + i / 5) as u16,
num_non_vote_transactions: 1672 + i,
};
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for i in 0..num_entries {
let slot = (i + 1) as u64 * 50;
let sample = slot_sample(i as u64);
let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
}
for i in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(i + 1).unwrap(),
expected_samples
);
}
}
#[test]
fn test_get_recent_perf_samples_v1_and_v2() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let num_entries: usize = 10;
let slot_sample_v1 = |i: u64| PerfSampleV1 {
num_transactions: 1599 + i,
num_slots: 123 + i / 2,
sample_period_secs: (42 + i / 5) as u16,
};
let slot_sample_v2 = |i: u64| PerfSampleV2 {
num_transactions: 5809 + i,
num_slots: 81 + i / 2,
sample_period_secs: (35 + i / 5) as u16,
num_non_vote_transactions: 2209 + i,
};
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for i in 0..num_entries {
let slot = (i + 1) as u64 * 50;
if i % 3 == 0 {
let sample = slot_sample_v1(i as u64);
let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
} else {
let sample = slot_sample_v2(i as u64);
let bytes = serialize(&sample).unwrap();
blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap();
perf_samples.push((slot, sample.into()));
}
}
for i in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(i + 1).unwrap(),
expected_samples
);
}
}
#[test]
fn test_write_perf_samples() {
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let num_entries: usize = 10; let num_entries: usize = 10;
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![]; let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 { for x in 1..num_entries + 1 {
perf_samples.push(( let slot = x as u64 * 50;
x as u64 * 50, let sample = PerfSampleV2 {
PerfSample { num_transactions: 1000 + x as u64,
num_transactions: 1000 + x as u64, num_slots: 50,
num_slots: 50, sample_period_secs: 20,
sample_period_secs: 20, num_non_vote_transactions: 300 + x as u64,
}, };
));
} blockstore.write_perf_sample(slot, &sample).unwrap();
for (slot, sample) in perf_samples.iter() { perf_samples.push((slot, PerfSample::V2(sample)));
blockstore.write_perf_sample(*slot, sample).unwrap();
} }
for x in 0..num_entries { for x in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec(); let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0)); expected_samples.sort_by(|a, b| b.0.cmp(&a.0));

View File

@ -832,9 +832,6 @@ impl SlotColumn for columns::PerfSamples {}
impl ColumnName for columns::PerfSamples { impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF; const NAME: &'static str = PERF_SAMPLES_CF;
} }
impl TypedColumn for columns::PerfSamples {
type Type = blockstore_meta::PerfSample;
}
impl SlotColumn for columns::BlockHeight {} impl SlotColumn for columns::BlockHeight {}
impl ColumnName for columns::BlockHeight { impl ColumnName for columns::BlockHeight {

View File

@ -374,13 +374,48 @@ pub struct AddressSignatureMeta {
pub writeable: bool, pub writeable: bool,
} }
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] /// Performance information about validator execution during a time slice.
pub struct PerfSample { ///
/// Older versions should only arise as a result of deserialization of entries stored by a previous
/// version of the validator. Current version should only produce [`PerfSampleV2`].
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PerfSample {
V1(PerfSampleV1),
V2(PerfSampleV2),
}
impl From<PerfSampleV1> for PerfSample {
fn from(value: PerfSampleV1) -> PerfSample {
PerfSample::V1(value)
}
}
impl From<PerfSampleV2> for PerfSample {
fn from(value: PerfSampleV2) -> PerfSample {
PerfSample::V2(value)
}
}
/// Version of [`PerfSample`] used before 1.15.x.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct PerfSampleV1 {
pub num_transactions: u64, pub num_transactions: u64,
pub num_slots: u64, pub num_slots: u64,
pub sample_period_secs: u16, pub sample_period_secs: u16,
} }
/// Version of the [`PerfSample`] introduced in 1.15.x.
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct PerfSampleV2 {
// `PerfSampleV1` part
pub num_transactions: u64,
pub num_slots: u64,
pub sample_period_secs: u16,
// New fields.
pub num_non_vote_transactions: u64,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct ProgramCost { pub struct ProgramCost {
pub cost: u64, pub cost: u64,
@ -569,4 +604,28 @@ mod test {
expected.next_slots = vec![6, 7]; expected.next_slots = vec![6, 7];
assert_eq!(slot_meta, expected); assert_eq!(slot_meta, expected);
} }
// `PerfSampleV2` should contain `PerfSampleV1` as a prefix, in order for the column to be
// backward and forward compatible.
#[test]
fn perf_sample_v1_is_prefix_of_perf_sample_v2() {
let v2 = PerfSampleV2 {
num_transactions: 4190143848,
num_slots: 3607325588,
sample_period_secs: 31263,
num_non_vote_transactions: 4056116066,
};
let v2_bytes = bincode::serialize(&v2).expect("`PerfSampleV2` can be serialized");
let actual: PerfSampleV1 = bincode::deserialize(&v2_bytes)
.expect("Bytes encoded as `PerfSampleV2` can be decoded as `PerfSampleV1`");
let expected = PerfSampleV1 {
num_transactions: v2.num_transactions,
num_slots: v2.num_slots,
sample_period_secs: v2.sample_period_secs,
};
assert_eq!(actual, expected);
}
} }

View File

@ -22,6 +22,7 @@ use {
solana_ledger::{ solana_ledger::{
blockstore::{Blockstore, SignatureInfosForAddress}, blockstore::{Blockstore, SignatureInfosForAddress},
blockstore_db::BlockstoreError, blockstore_db::BlockstoreError,
blockstore_meta::{PerfSample, PerfSampleV1, PerfSampleV2},
get_tmp_ledger_path, get_tmp_ledger_path,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}, },
@ -3437,13 +3438,8 @@ pub mod rpc_full {
warn!("get_recent_performance_samples failed: {:?}", err); warn!("get_recent_performance_samples failed: {:?}", err);
Error::invalid_request() Error::invalid_request()
})? })?
.iter() .into_iter()
.map(|(slot, sample)| RpcPerfSample { .map(|(slot, sample)| rpc_perf_sample_from_perf_sample(slot, sample))
slot: *slot,
num_transactions: sample.num_transactions,
num_slots: sample.num_slots,
sample_period_secs: sample.sample_period_secs,
})
.collect()) .collect())
} }
@ -4015,6 +4011,32 @@ pub mod rpc_full {
} }
} }
fn rpc_perf_sample_from_perf_sample(slot: u64, sample: PerfSample) -> RpcPerfSample {
match sample {
PerfSample::V1(PerfSampleV1 {
num_transactions,
num_slots,
sample_period_secs,
}) => RpcPerfSample {
slot,
num_transactions,
num_slots,
sample_period_secs,
},
PerfSample::V2(PerfSampleV2 {
num_transactions,
num_non_vote_transactions: _,
num_slots,
sample_period_secs,
}) => RpcPerfSample {
slot,
num_transactions,
num_slots,
sample_period_secs,
},
}
}
// RPC methods deprecated in v1.8 // RPC methods deprecated in v1.8
pub mod rpc_deprecated_v1_9 { pub mod rpc_deprecated_v1_9 {
#![allow(deprecated)] #![allow(deprecated)]
@ -4602,7 +4624,7 @@ pub mod tests {
solana_entry::entry::next_versioned_entry, solana_entry::entry::next_versioned_entry,
solana_gossip::socketaddr, solana_gossip::socketaddr,
solana_ledger::{ solana_ledger::{
blockstore_meta::PerfSample, blockstore_meta::PerfSampleV2,
blockstore_processor::fill_blockstore_slot_with_ticks, blockstore_processor::fill_blockstore_slot_with_ticks,
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
}, },
@ -5120,13 +5142,15 @@ pub mod tests {
let slot = 0; let slot = 0;
let num_slots = 1; let num_slots = 1;
let num_transactions = 4; let num_transactions = 4;
let num_non_vote_transactions = 1;
let sample_period_secs = 60; let sample_period_secs = 60;
rpc.blockstore rpc.blockstore
.write_perf_sample( .write_perf_sample(
slot, slot,
&PerfSample { &PerfSampleV2 {
num_slots, num_slots,
num_transactions, num_transactions,
num_non_vote_transactions,
sample_period_secs, sample_period_secs,
}, },
) )