From 59fde130d610325d6ecb19a8ab46fe76b1e749af Mon Sep 17 00:00:00 2001 From: Illia Bobyr Date: Thu, 12 Jan 2023 19:14:04 -0800 Subject: [PATCH] ledger/blockstore: PerfSampleV2: num_non_vote_transactions (#29404) Store non-vote transaction counts that are now recorded by the banks into the `blockstore`. `SamplePerformanceService` now populates `PerfSampleV2` with counts from the banks. --- core/src/sample_performance_service.rs | 53 ++++---- ledger/src/blockstore.rs | 175 ++++++++++++++++++++++--- ledger/src/blockstore_db.rs | 3 - ledger/src/blockstore_meta.rs | 63 ++++++++- rpc/src/rpc.rs | 42 ++++-- 5 files changed, 279 insertions(+), 57 deletions(-) diff --git a/core/src/sample_performance_service.rs b/core/src/sample_performance_service.rs index fe6820c91..8357a62db 100644 --- a/core/src/sample_performance_service.rs +++ b/core/src/sample_performance_service.rs @@ -1,5 +1,5 @@ use { - solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSample}, + solana_ledger::{blockstore::Blockstore, blockstore_meta::PerfSampleV2}, solana_runtime::bank_forks::BankForks, std::{ sync::{ @@ -16,7 +16,8 @@ const SLEEP_INTERVAL: u64 = 500; pub struct SamplePerformanceSnapshot { pub num_transactions: u64, - pub num_slots: u64, + pub num_non_vote_transactions: u64, + pub highest_slot: u64, } pub struct SamplePerformanceService { @@ -50,14 +51,15 @@ impl SamplePerformanceService { blockstore: &Arc, exit: Arc, ) { - let forks = bank_forks.read().unwrap(); - let bank = forks.root_bank(); - let highest_slot = forks.highest_slot(); - drop(forks); + let (bank, highest_slot) = { + let forks = bank_forks.read().unwrap(); + (forks.root_bank(), forks.highest_slot()) + }; - let mut sample_snapshot = SamplePerformanceSnapshot { + let mut snapshot = SamplePerformanceSnapshot { num_transactions: bank.transaction_count(), - num_slots: highest_slot, + num_non_vote_transactions: bank.non_vote_transaction_count_since_restart(), + highest_slot, }; let mut now = Instant::now(); @@ -70,19 +72,23 @@ impl SamplePerformanceService { if elapsed.as_secs() >= SAMPLE_INTERVAL { now = Instant::now(); - let bank_forks = bank_forks.read().unwrap(); - let bank = bank_forks.root_bank().clone(); - let highest_slot = bank_forks.highest_slot(); - drop(bank_forks); + let (bank, highest_slot) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.highest_slot()) + }; - let perf_sample = PerfSample { - num_slots: highest_slot - .checked_sub(sample_snapshot.num_slots) - .unwrap_or_default(), - num_transactions: bank - .transaction_count() - .checked_sub(sample_snapshot.num_transactions) - .unwrap_or_default(), + let num_slots = highest_slot.saturating_sub(snapshot.highest_slot); + let num_transactions = bank + .transaction_count() + .saturating_sub(snapshot.num_transactions); + let num_non_vote_transactions = bank + .non_vote_transaction_count_since_restart() + .saturating_sub(snapshot.num_non_vote_transactions); + + let perf_sample = PerfSampleV2 { + num_slots, + num_transactions, + num_non_vote_transactions, sample_period_secs: elapsed.as_secs() as u16, }; @@ -90,9 +96,10 @@ impl SamplePerformanceService { error!("write_perf_sample failed: slot {:?} {:?}", highest_slot, e); } - sample_snapshot = SamplePerformanceSnapshot { - num_transactions: bank.transaction_count(), - num_slots: highest_slot, + snapshot = SamplePerformanceSnapshot { + num_transactions, + num_non_vote_transactions, + highest_slot, }; } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8e1552dc6..097daa97f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -23,7 +23,7 @@ use { slot_stats::{ShredSource, SlotsStats}, }, assert_matches::debug_assert_matches, - bincode::deserialize, + bincode::{deserialize, serialize}, crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, dashmap::DashSet, log::*, @@ -2712,19 +2712,40 @@ impl Blockstore { } pub fn get_recent_perf_samples(&self, num: usize) -> Result> { - Ok(self + // When reading `PerfSamples`, the database may contain samples with either `PerfSampleV1` + // or `PerfSampleV2` encoding. We expect `PerfSampleV1` to be a prefix of the + // `PerfSampleV2` encoding (see [`perf_sample_v1_is_prefix_of_perf_sample_v2`]), so we try + // them in order. + let samples = self .db .iter::(IteratorMode::End)? .take(num) .map(|(slot, data)| { - let perf_sample = deserialize(&data).unwrap(); - (slot, perf_sample) - }) - .collect()) + deserialize::(&data) + .map(|sample| (slot, sample.into())) + .or_else(|err| { + match &*err { + bincode::ErrorKind::Io(io_err) + if matches!(io_err.kind(), ErrorKind::UnexpectedEof) => + { + // Not enough bytes to deserialize as `PerfSampleV2`. + } + _ => return Err(err), + } + + deserialize::(&data).map(|sample| (slot, sample.into())) + }) + .map_err(Into::into) + }); + + samples.collect() } - pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> { - self.perf_samples_cf.put(index, perf_sample) + pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSampleV2) -> Result<()> { + // Always write as the current version. + let bytes = + serialize(&perf_sample).expect("`PerfSampleV2` can be serialized with `bincode`"); + self.perf_samples_cf.put_bytes(index, &bytes) } pub fn read_program_costs(&self) -> Result> { @@ -8503,25 +8524,139 @@ pub mod tests { } #[test] - fn test_write_get_perf_samples() { + fn test_get_recent_perf_samples_v1_only() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let num_entries: usize = 10; + + let slot_sample = |i: u64| PerfSampleV1 { + num_transactions: 1406 + i, + num_slots: 34 + i / 2, + sample_period_secs: (40 + i / 5) as u16, + }; + + let mut perf_samples: Vec<(Slot, PerfSample)> = vec![]; + for i in 0..num_entries { + let slot = (i + 1) as u64 * 50; + let sample = slot_sample(i as u64); + + let bytes = serialize(&sample).unwrap(); + blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap(); + perf_samples.push((slot, sample.into())); + } + + for i in 0..num_entries { + let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec(); + expected_samples.sort_by(|a, b| b.0.cmp(&a.0)); + assert_eq!( + blockstore.get_recent_perf_samples(i + 1).unwrap(), + expected_samples + ); + } + } + + #[test] + fn test_get_recent_perf_samples_v2_only() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let num_entries: usize = 10; + + let slot_sample = |i: u64| PerfSampleV2 { + num_transactions: 2495 + i, + num_slots: 167 + i / 2, + sample_period_secs: (37 + i / 5) as u16, + num_non_vote_transactions: 1672 + i, + }; + + let mut perf_samples: Vec<(Slot, PerfSample)> = vec![]; + for i in 0..num_entries { + let slot = (i + 1) as u64 * 50; + let sample = slot_sample(i as u64); + + let bytes = serialize(&sample).unwrap(); + blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap(); + perf_samples.push((slot, sample.into())); + } + + for i in 0..num_entries { + let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec(); + expected_samples.sort_by(|a, b| b.0.cmp(&a.0)); + assert_eq!( + blockstore.get_recent_perf_samples(i + 1).unwrap(), + expected_samples + ); + } + } + + #[test] + fn test_get_recent_perf_samples_v1_and_v2() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let num_entries: usize = 10; + + let slot_sample_v1 = |i: u64| PerfSampleV1 { + num_transactions: 1599 + i, + num_slots: 123 + i / 2, + sample_period_secs: (42 + i / 5) as u16, + }; + + let slot_sample_v2 = |i: u64| PerfSampleV2 { + num_transactions: 5809 + i, + num_slots: 81 + i / 2, + sample_period_secs: (35 + i / 5) as u16, + num_non_vote_transactions: 2209 + i, + }; + + let mut perf_samples: Vec<(Slot, PerfSample)> = vec![]; + for i in 0..num_entries { + let slot = (i + 1) as u64 * 50; + + if i % 3 == 0 { + let sample = slot_sample_v1(i as u64); + let bytes = serialize(&sample).unwrap(); + blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap(); + perf_samples.push((slot, sample.into())); + } else { + let sample = slot_sample_v2(i as u64); + let bytes = serialize(&sample).unwrap(); + blockstore.perf_samples_cf.put_bytes(slot, &bytes).unwrap(); + perf_samples.push((slot, sample.into())); + } + } + + for i in 0..num_entries { + let mut expected_samples = perf_samples[num_entries - 1 - i..].to_vec(); + expected_samples.sort_by(|a, b| b.0.cmp(&a.0)); + assert_eq!( + blockstore.get_recent_perf_samples(i + 1).unwrap(), + expected_samples + ); + } + } + + #[test] + fn test_write_perf_samples() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let num_entries: usize = 10; let mut perf_samples: Vec<(Slot, PerfSample)> = vec![]; for x in 1..num_entries + 1 { - perf_samples.push(( - x as u64 * 50, - PerfSample { - num_transactions: 1000 + x as u64, - num_slots: 50, - sample_period_secs: 20, - }, - )); - } - for (slot, sample) in perf_samples.iter() { - blockstore.write_perf_sample(*slot, sample).unwrap(); + let slot = x as u64 * 50; + let sample = PerfSampleV2 { + num_transactions: 1000 + x as u64, + num_slots: 50, + sample_period_secs: 20, + num_non_vote_transactions: 300 + x as u64, + }; + + blockstore.write_perf_sample(slot, &sample).unwrap(); + perf_samples.push((slot, PerfSample::V2(sample))); } + for x in 0..num_entries { let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec(); expected_samples.sort_by(|a, b| b.0.cmp(&a.0)); diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index db6ce7e79..c020986e0 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -832,9 +832,6 @@ impl SlotColumn for columns::PerfSamples {} impl ColumnName for columns::PerfSamples { const NAME: &'static str = PERF_SAMPLES_CF; } -impl TypedColumn for columns::PerfSamples { - type Type = blockstore_meta::PerfSample; -} impl SlotColumn for columns::BlockHeight {} impl ColumnName for columns::BlockHeight { diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 9c3af2d45..fd416da07 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -374,13 +374,48 @@ pub struct AddressSignatureMeta { pub writeable: bool, } -#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] -pub struct PerfSample { +/// Performance information about validator execution during a time slice. +/// +/// Older versions should only arise as a result of deserialization of entries stored by a previous +/// version of the validator. Current version should only produce [`PerfSampleV2`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum PerfSample { + V1(PerfSampleV1), + V2(PerfSampleV2), +} + +impl From for PerfSample { + fn from(value: PerfSampleV1) -> PerfSample { + PerfSample::V1(value) + } +} + +impl From for PerfSample { + fn from(value: PerfSampleV2) -> PerfSample { + PerfSample::V2(value) + } +} + +/// Version of [`PerfSample`] used before 1.15.x. +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct PerfSampleV1 { pub num_transactions: u64, pub num_slots: u64, pub sample_period_secs: u16, } +/// Version of the [`PerfSample`] introduced in 1.15.x. +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct PerfSampleV2 { + // `PerfSampleV1` part + pub num_transactions: u64, + pub num_slots: u64, + pub sample_period_secs: u16, + + // New fields. + pub num_non_vote_transactions: u64, +} + #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct ProgramCost { pub cost: u64, @@ -569,4 +604,28 @@ mod test { expected.next_slots = vec![6, 7]; assert_eq!(slot_meta, expected); } + + // `PerfSampleV2` should contain `PerfSampleV1` as a prefix, in order for the column to be + // backward and forward compatible. + #[test] + fn perf_sample_v1_is_prefix_of_perf_sample_v2() { + let v2 = PerfSampleV2 { + num_transactions: 4190143848, + num_slots: 3607325588, + sample_period_secs: 31263, + num_non_vote_transactions: 4056116066, + }; + + let v2_bytes = bincode::serialize(&v2).expect("`PerfSampleV2` can be serialized"); + + let actual: PerfSampleV1 = bincode::deserialize(&v2_bytes) + .expect("Bytes encoded as `PerfSampleV2` can be decoded as `PerfSampleV1`"); + let expected = PerfSampleV1 { + num_transactions: v2.num_transactions, + num_slots: v2.num_slots, + sample_period_secs: v2.sample_period_secs, + }; + + assert_eq!(actual, expected); + } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index e25097ab2..e42134beb 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -22,6 +22,7 @@ use { solana_ledger::{ blockstore::{Blockstore, SignatureInfosForAddress}, blockstore_db::BlockstoreError, + blockstore_meta::{PerfSample, PerfSampleV1, PerfSampleV2}, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, }, @@ -3437,13 +3438,8 @@ pub mod rpc_full { warn!("get_recent_performance_samples failed: {:?}", err); Error::invalid_request() })? - .iter() - .map(|(slot, sample)| RpcPerfSample { - slot: *slot, - num_transactions: sample.num_transactions, - num_slots: sample.num_slots, - sample_period_secs: sample.sample_period_secs, - }) + .into_iter() + .map(|(slot, sample)| rpc_perf_sample_from_perf_sample(slot, sample)) .collect()) } @@ -4015,6 +4011,32 @@ pub mod rpc_full { } } +fn rpc_perf_sample_from_perf_sample(slot: u64, sample: PerfSample) -> RpcPerfSample { + match sample { + PerfSample::V1(PerfSampleV1 { + num_transactions, + num_slots, + sample_period_secs, + }) => RpcPerfSample { + slot, + num_transactions, + num_slots, + sample_period_secs, + }, + PerfSample::V2(PerfSampleV2 { + num_transactions, + num_non_vote_transactions: _, + num_slots, + sample_period_secs, + }) => RpcPerfSample { + slot, + num_transactions, + num_slots, + sample_period_secs, + }, + } +} + // RPC methods deprecated in v1.8 pub mod rpc_deprecated_v1_9 { #![allow(deprecated)] @@ -4602,7 +4624,7 @@ pub mod tests { solana_entry::entry::next_versioned_entry, solana_gossip::socketaddr, solana_ledger::{ - blockstore_meta::PerfSample, + blockstore_meta::PerfSampleV2, blockstore_processor::fill_blockstore_slot_with_ticks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }, @@ -5120,13 +5142,15 @@ pub mod tests { let slot = 0; let num_slots = 1; let num_transactions = 4; + let num_non_vote_transactions = 1; let sample_period_secs = 60; rpc.blockstore .write_perf_sample( slot, - &PerfSample { + &PerfSampleV2 { num_slots, num_transactions, + num_non_vote_transactions, sample_period_secs, }, )