diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 7bd821b348..aa891fe0cd 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -4,7 +4,7 @@ use { crate::bigtable::RowKey, log::*, serde::{Deserialize, Serialize}, - solana_metrics::{datapoint_info, inc_new_counter_debug}, + solana_metrics::datapoint_info, solana_sdk::{ clock::{Slot, UnixTimestamp}, deserialize_utils::default_on_eof, @@ -12,6 +12,7 @@ use { pubkey::Pubkey, signature::Signature, sysvar::is_sysvar_id, + timing::AtomicInterval, transaction::{TransactionError, VersionedTransaction}, }, solana_storage_proto::convert::{generated, tx_by_addr}, @@ -24,6 +25,10 @@ use { std::{ collections::{HashMap, HashSet}, convert::TryInto, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::Duration, }, thiserror::Error, @@ -399,9 +404,38 @@ impl Default for LedgerStorageConfig { } } +const METRICS_REPORT_INTERVAL_MS: u64 = 10_000; + +#[derive(Default)] +struct LedgerStorageStats { + num_queries: AtomicUsize, + last_report: AtomicInterval, +} + +impl LedgerStorageStats { + fn increment_num_queries(&self) { + self.num_queries.fetch_add(1, Ordering::Relaxed); + self.maybe_report(); + } + + fn maybe_report(&self) { + if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) { + datapoint_debug!( + "storage-bigtable-query", + ( + "num_queries", + self.num_queries.swap(0, Ordering::Relaxed) as i64, + i64 + ) + ); + } + } +} + #[derive(Clone)] pub struct LedgerStorage { connection: bigtable::BigTableConnection, + stats: Arc, } impl LedgerStorage { @@ -425,6 +459,7 @@ impl LedgerStorage { endpoint: &str, timeout: Option, ) -> Result { + let stats = Arc::new(LedgerStorageStats::default()); Ok(Self { connection: bigtable::BigTableConnection::new_for_emulator( instance_name, @@ -432,10 +467,12 @@ impl LedgerStorage { endpoint, timeout, )?, + stats, }) } pub async fn new_with_config(config: LedgerStorageConfig) -> Result { + let stats = Arc::new(LedgerStorageStats::default()); let LedgerStorageConfig { read_only, timeout, @@ -451,7 +488,7 @@ impl LedgerStorage { credential_type, ) .await?; - Ok(Self { connection }) + Ok(Self { stats, connection }) } pub async fn new_with_stringified_credential(credential: String) -> Result { @@ -464,8 +501,8 @@ impl LedgerStorage { /// Return the available slot that contains a block pub async fn get_first_available_block(&self) -> Result> { - debug!("LedgerStorage::get_first_available_block request received"); - inc_new_counter_debug!("storage-bigtable-query", 1); + trace!("LedgerStorage::get_first_available_block request received"); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?; if blocks.is_empty() { @@ -479,11 +516,12 @@ impl LedgerStorage { /// start_slot: slot to start the search from (inclusive) /// limit: stop after this many slots have been found pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result> { - debug!( + trace!( "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}", - start_slot, limit + start_slot, + limit ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let blocks = bigtable .get_row_keys( @@ -501,11 +539,11 @@ impl LedgerStorage { &self, slots: &'a [Slot], ) -> Result + 'a> { - debug!( + trace!( "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}", slots ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let row_keys = slots.iter().copied().map(slot_to_blocks_key); let data = bigtable @@ -528,11 +566,11 @@ impl LedgerStorage { /// Fetch the confirmed block from the desired slot pub async fn get_confirmed_block(&self, slot: Slot) -> Result { - debug!( + trace!( "LedgerStorage::get_confirmed_block request received: {:?}", slot ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let block_cell_data = bigtable .get_protobuf_or_bincode_cell::( @@ -554,11 +592,11 @@ impl LedgerStorage { /// Does the confirmed block exist in the Bigtable pub async fn confirmed_block_exists(&self, slot: Slot) -> Result { - debug!( + trace!( "LedgerStorage::confirmed_block_exists request received: {:?}", slot ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let block_exists = bigtable @@ -569,11 +607,11 @@ impl LedgerStorage { } pub async fn get_signature_status(&self, signature: &Signature) -> Result { - debug!( + trace!( "LedgerStorage::get_signature_status request received: {:?}", signature ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let transaction_info = bigtable .get_bincode_cell::("tx", signature.to_string()) @@ -590,11 +628,11 @@ impl LedgerStorage { &self, signatures: &[Signature], ) -> Result> { - debug!( + trace!( "LedgerStorage::get_confirmed_transactions request received: {:?}", signatures ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); // Fetch transactions info @@ -652,11 +690,11 @@ impl LedgerStorage { &self, signature: &Signature, ) -> Result> { - debug!( + trace!( "LedgerStorage::get_confirmed_transaction request received: {:?}", signature ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); // Figure out which block the transaction is located in @@ -712,11 +750,11 @@ impl LedgerStorage { u32, /*slot index*/ )>, > { - debug!( + trace!( "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}", address ); - inc_new_counter_debug!("storage-bigtable-query", 1); + self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let address_prefix = format!("{address}/"); @@ -835,12 +873,16 @@ impl LedgerStorage { Ok(infos) } - // Upload a new confirmed block and associated meta data. + /// Upload a new confirmed block and associated meta data. pub async fn upload_confirmed_block( &self, slot: Slot, confirmed_block: VersionedConfirmedBlock, ) -> Result<()> { + trace!( + "LedgerStorage::upload_confirmed_block request received: {:?}", + slot + ); let mut by_addr: HashMap<&Pubkey, Vec> = HashMap::new(); let mut tx_cells = vec![];