#![allow(clippy::arithmetic_side_effects)] use { crate::bigtable::RowKey, log::*, serde::{Deserialize, Serialize}, solana_metrics::datapoint_info, solana_sdk::{ clock::{Slot, UnixTimestamp}, deserialize_utils::default_on_eof, message::v0::LoadedAddresses, pubkey::Pubkey, signature::Signature, sysvar::is_sysvar_id, timing::AtomicInterval, transaction::{TransactionError, VersionedTransaction}, }, solana_storage_proto::convert::{entries, generated, tx_by_addr}, solana_transaction_status::{ extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta, EntrySummary, Reward, TransactionByAddrInfo, TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta, TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries, VersionedTransactionWithStatusMeta, }, std::{ collections::{HashMap, HashSet}, convert::TryInto, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, time::Duration, }, thiserror::Error, tokio::task::JoinError, }; #[macro_use] extern crate solana_metrics; #[macro_use] extern crate serde_derive; mod access_token; mod bigtable; mod compression; mod root_ca_certificate; #[derive(Debug, Error)] pub enum Error { #[error("BigTable: {0}")] BigTableError(bigtable::Error), #[error("I/O Error: {0}")] IoError(std::io::Error), #[error("Transaction encoded is not supported")] UnsupportedTransactionEncoding, #[error("Block not found: {0}")] BlockNotFound(Slot), #[error("Signature not found")] SignatureNotFound, #[error("tokio error")] TokioJoinError(JoinError), } impl std::convert::From for Error { fn from(err: bigtable::Error) -> Self { Self::BigTableError(err) } } impl std::convert::From for Error { fn from(err: std::io::Error) -> Self { Self::IoError(err) } } pub type Result = std::result::Result; // Convert a slot to its bucket representation whereby lower slots are always lexically ordered // before higher slots fn slot_to_key(slot: Slot) -> String { format!("{slot:016x}") } fn slot_to_blocks_key(slot: Slot) -> String { slot_to_key(slot) } fn slot_to_entries_key(slot: Slot) -> String { slot_to_key(slot) } fn slot_to_tx_by_addr_key(slot: Slot) -> String { slot_to_key(!slot) } // Reverse of `slot_to_key` fn key_to_slot(key: &str) -> Option { match Slot::from_str_radix(key, 16) { Ok(slot) => Some(slot), Err(err) => { // bucket data is probably corrupt warn!("Failed to parse object key as a slot: {}: {}", key, err); None } } } // A serialized `StoredConfirmedBlock` is stored in the `block` table // // StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids // some serde JSON directives that cause issues with bincode // // Note: in order to continue to support old bincode-serialized bigtable entries, if new fields are // added to ConfirmedBlock, they must either be excluded or set to `default_on_eof` here // #[derive(Serialize, Deserialize)] struct StoredConfirmedBlock { previous_blockhash: String, blockhash: String, parent_slot: Slot, transactions: Vec, rewards: StoredConfirmedBlockRewards, block_time: Option, #[serde(deserialize_with = "default_on_eof")] block_height: Option, } #[cfg(test)] impl From for StoredConfirmedBlock { fn from(confirmed_block: ConfirmedBlock) -> Self { let ConfirmedBlock { previous_blockhash, blockhash, parent_slot, transactions, rewards, block_time, block_height, } = confirmed_block; Self { previous_blockhash, blockhash, parent_slot, transactions: transactions.into_iter().map(|tx| tx.into()).collect(), rewards: rewards.into_iter().map(|reward| reward.into()).collect(), block_time, block_height, } } } impl From for ConfirmedBlock { fn from(confirmed_block: StoredConfirmedBlock) -> Self { let StoredConfirmedBlock { previous_blockhash, blockhash, parent_slot, transactions, rewards, block_time, block_height, } = confirmed_block; Self { previous_blockhash, blockhash, parent_slot, transactions: transactions.into_iter().map(|tx| tx.into()).collect(), rewards: rewards.into_iter().map(|reward| reward.into()).collect(), block_time, block_height, } } } #[derive(Serialize, Deserialize)] struct StoredConfirmedBlockTransaction { transaction: VersionedTransaction, meta: Option, } #[cfg(test)] impl From for StoredConfirmedBlockTransaction { fn from(value: TransactionWithStatusMeta) -> Self { match value { TransactionWithStatusMeta::MissingMetadata(transaction) => Self { transaction: VersionedTransaction::from(transaction), meta: None, }, TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta { transaction, meta, }) => Self { transaction, meta: Some(meta.into()), }, } } } impl From for TransactionWithStatusMeta { fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self { let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta; match meta { None => Self::MissingMetadata( transaction .into_legacy_transaction() .expect("versioned transactions always have meta"), ), Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta { transaction, meta: meta.into(), }), } } } #[derive(Serialize, Deserialize)] struct StoredConfirmedBlockTransactionStatusMeta { err: Option, fee: u64, pre_balances: Vec, post_balances: Vec, } impl From for TransactionStatusMeta { fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self { let StoredConfirmedBlockTransactionStatusMeta { err, fee, pre_balances, post_balances, } = value; let status = match &err { None => Ok(()), Some(err) => Err(err.clone()), }; Self { status, fee, pre_balances, post_balances, inner_instructions: None, log_messages: None, pre_token_balances: None, post_token_balances: None, rewards: None, loaded_addresses: LoadedAddresses::default(), return_data: None, compute_units_consumed: None, } } } impl From for StoredConfirmedBlockTransactionStatusMeta { fn from(value: TransactionStatusMeta) -> Self { let TransactionStatusMeta { status, fee, pre_balances, post_balances, .. } = value; Self { err: status.err(), fee, pre_balances, post_balances, } } } type StoredConfirmedBlockRewards = Vec; #[derive(Serialize, Deserialize)] struct StoredConfirmedBlockReward { pubkey: String, lamports: i64, } impl From for Reward { fn from(value: StoredConfirmedBlockReward) -> Self { let StoredConfirmedBlockReward { pubkey, lamports } = value; Self { pubkey, lamports, post_balance: 0, reward_type: None, commission: None, } } } impl From for StoredConfirmedBlockReward { fn from(value: Reward) -> Self { let Reward { pubkey, lamports, .. } = value; Self { pubkey, lamports } } } // A serialized `TransactionInfo` is stored in the `tx` table #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] struct TransactionInfo { slot: Slot, // The slot that contains the block with this transaction in it index: u32, // Where the transaction is located in the block err: Option, // None if the transaction executed successfully memo: Option, // Transaction memo } // Part of a serialized `TransactionInfo` which is stored in the `tx` table #[derive(PartialEq, Eq, Debug)] struct UploadedTransaction { slot: Slot, // The slot that contains the block with this transaction in it index: u32, // Where the transaction is located in the block err: Option, // None if the transaction executed successfully } impl From for UploadedTransaction { fn from(transaction_info: TransactionInfo) -> Self { Self { slot: transaction_info.slot, index: transaction_info.index, err: transaction_info.err, } } } impl From for TransactionStatus { fn from(transaction_info: TransactionInfo) -> Self { let TransactionInfo { slot, err, .. } = transaction_info; let status = match &err { None => Ok(()), Some(err) => Err(err.clone()), }; Self { slot, confirmations: None, status, err, confirmation_status: Some(TransactionConfirmationStatus::Finalized), } } } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] struct LegacyTransactionByAddrInfo { pub signature: Signature, // The transaction signature pub err: Option, // None if the transaction executed successfully pub index: u32, // Where the transaction is located in the block pub memo: Option, // Transaction memo } impl From for TransactionByAddrInfo { fn from(legacy: LegacyTransactionByAddrInfo) -> Self { let LegacyTransactionByAddrInfo { signature, err, index, memo, } = legacy; Self { signature, err, index, memo, block_time: None, } } } pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger"; pub const DEFAULT_APP_PROFILE_ID: &str = "default"; #[derive(Debug)] pub enum CredentialType { Filepath(Option), Stringified(String), } #[derive(Debug)] pub struct LedgerStorageConfig { pub read_only: bool, pub timeout: Option, pub credential_type: CredentialType, pub instance_name: String, pub app_profile_id: String, } impl Default for LedgerStorageConfig { fn default() -> Self { Self { read_only: true, timeout: None, credential_type: CredentialType::Filepath(None), instance_name: DEFAULT_INSTANCE_NAME.to_string(), app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(), } } } const METRICS_REPORT_INTERVAL_MS: u64 = 10_000; #[derive(Default)] struct LedgerStorageStats { num_queries: AtomicUsize, last_report: AtomicInterval, } impl LedgerStorageStats { fn increment_num_queries(&self) { self.num_queries.fetch_add(1, Ordering::Relaxed); self.maybe_report(); } fn maybe_report(&self) { if self.last_report.should_update(METRICS_REPORT_INTERVAL_MS) { datapoint_debug!( "storage-bigtable-query", ( "num_queries", self.num_queries.swap(0, Ordering::Relaxed) as i64, i64 ) ); } } } #[derive(Clone)] pub struct LedgerStorage { connection: bigtable::BigTableConnection, stats: Arc, } impl LedgerStorage { pub async fn new( read_only: bool, timeout: Option, credential_path: Option, ) -> Result { Self::new_with_config(LedgerStorageConfig { read_only, timeout, credential_type: CredentialType::Filepath(credential_path), ..LedgerStorageConfig::default() }) .await } pub fn new_for_emulator( instance_name: &str, app_profile_id: &str, endpoint: &str, timeout: Option, ) -> Result { let stats = Arc::new(LedgerStorageStats::default()); Ok(Self { connection: bigtable::BigTableConnection::new_for_emulator( instance_name, app_profile_id, endpoint, timeout, )?, stats, }) } pub async fn new_with_config(config: LedgerStorageConfig) -> Result { let stats = Arc::new(LedgerStorageStats::default()); let LedgerStorageConfig { read_only, timeout, instance_name, app_profile_id, credential_type, } = config; let connection = bigtable::BigTableConnection::new( instance_name.as_str(), app_profile_id.as_str(), read_only, timeout, credential_type, ) .await?; Ok(Self { stats, connection }) } pub async fn new_with_stringified_credential(credential: String) -> Result { Self::new_with_config(LedgerStorageConfig { credential_type: CredentialType::Stringified(credential), ..LedgerStorageConfig::default() }) .await } /// Return the available slot that contains a block pub async fn get_first_available_block(&self) -> Result> { trace!("LedgerStorage::get_first_available_block request received"); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?; if blocks.is_empty() { return Ok(None); } Ok(key_to_slot(&blocks[0])) } /// Fetch the next slots after the provided slot that contains a block /// /// start_slot: slot to start the search from (inclusive) /// limit: stop after this many slots have been found pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result> { trace!( "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}", start_slot, limit ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let blocks = bigtable .get_row_keys( "blocks", Some(slot_to_blocks_key(start_slot)), None, limit as i64, ) .await?; Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect()) } // Fetches and gets a vector of confirmed blocks via a multirow fetch pub async fn get_confirmed_blocks_with_data<'a>( &self, slots: &'a [Slot], ) -> Result + 'a> { trace!( "LedgerStorage::get_confirmed_blocks_with_data request received: {:?}", slots ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let row_keys = slots.iter().copied().map(slot_to_blocks_key); let data = bigtable .get_protobuf_or_bincode_cells("blocks", row_keys) .await? .filter_map( |(row_key, block_cell_data): ( RowKey, bigtable::CellData, )| { let block = match block_cell_data { bigtable::CellData::Bincode(block) => block.into(), bigtable::CellData::Protobuf(block) => block.try_into().ok()?, }; Some((key_to_slot(&row_key).unwrap(), block)) }, ); Ok(data) } /// Fetch the confirmed block from the desired slot pub async fn get_confirmed_block(&self, slot: Slot) -> Result { trace!( "LedgerStorage::get_confirmed_block request received: {:?}", slot ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let block_cell_data = bigtable .get_protobuf_or_bincode_cell::( "blocks", slot_to_blocks_key(slot), ) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::BlockNotFound(slot), _ => err.into(), })?; Ok(match block_cell_data { bigtable::CellData::Bincode(block) => block.into(), bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| { bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot))) })?, }) } /// Does the confirmed block exist in the Bigtable pub async fn confirmed_block_exists(&self, slot: Slot) -> Result { trace!( "LedgerStorage::confirmed_block_exists request received: {:?}", slot ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let block_exists = bigtable .row_key_exists("blocks", slot_to_blocks_key(slot)) .await?; Ok(block_exists) } /// Fetches a vector of block entries via a multirow fetch pub async fn get_entries(&self, slot: Slot) -> Result> { trace!( "LedgerStorage::get_block_entries request received: {:?}", slot ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let entry_cell_data = bigtable .get_protobuf_cell::("entries", slot_to_entries_key(slot)) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::BlockNotFound(slot), _ => err.into(), })?; let entries = entry_cell_data.entries.into_iter().map(Into::into); Ok(entries) } pub async fn get_signature_status(&self, signature: &Signature) -> Result { trace!( "LedgerStorage::get_signature_status request received: {:?}", signature ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let transaction_info = bigtable .get_bincode_cell::("tx", signature.to_string()) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::SignatureNotFound, _ => err.into(), })?; Ok(transaction_info.into()) } // Fetches and gets a vector of confirmed transactions via a multirow fetch pub async fn get_confirmed_transactions( &self, signatures: &[Signature], ) -> Result> { trace!( "LedgerStorage::get_confirmed_transactions request received: {:?}", signatures ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); // Fetch transactions info let keys = signatures.iter().map(|s| s.to_string()).collect::>(); let cells = bigtable .get_bincode_cells::("tx", &keys) .await?; // Collect by slot let mut order: Vec<(Slot, u32, String)> = Vec::new(); let mut slots: HashSet = HashSet::new(); for cell in cells { if let (signature, Ok(TransactionInfo { slot, index, .. })) = cell { order.push((slot, index, signature)); slots.insert(slot); } } // Fetch blocks let blocks = self .get_confirmed_blocks_with_data(&slots.into_iter().collect::>()) .await? .collect::>(); // Extract transactions Ok(order .into_iter() .filter_map(|(slot, index, signature)| { blocks.get(&slot).and_then(|block| { block .transactions .get(index as usize) .and_then(|tx_with_meta| { if tx_with_meta.transaction_signature().to_string() != *signature { warn!( "Transaction info or confirmed block for {} is corrupt", signature ); None } else { Some(ConfirmedTransactionWithStatusMeta { slot, tx_with_meta: tx_with_meta.clone(), block_time: block.block_time, }) } }) }) }) .collect::>()) } /// Fetch a confirmed transaction pub async fn get_confirmed_transaction( &self, signature: &Signature, ) -> Result> { trace!( "LedgerStorage::get_confirmed_transaction request received: {:?}", signature ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); // Figure out which block the transaction is located in let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", signature.to_string()) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::SignatureNotFound, _ => err.into(), })?; // Load the block and return the transaction let block = self.get_confirmed_block(slot).await?; match block.transactions.into_iter().nth(index as usize) { None => { // report this somewhere actionable? warn!("Transaction info for {} is corrupt", signature); Ok(None) } Some(tx_with_meta) => { if tx_with_meta.transaction_signature() != signature { warn!( "Transaction info or confirmed block for {} is corrupt", signature ); Ok(None) } else { Ok(Some(ConfirmedTransactionWithStatusMeta { slot, tx_with_meta, block_time: block.block_time, })) } } } } /// Get confirmed signatures for the provided address, in descending ledger order /// /// address: address to search for /// before_signature: start with the first signature older than this one /// until_signature: end with the last signature more recent than this one /// limit: stop after this many signatures; if limit==0, all records in the table will be read pub async fn get_confirmed_signatures_for_address( &self, address: &Pubkey, before_signature: Option<&Signature>, until_signature: Option<&Signature>, limit: usize, ) -> Result< Vec<( ConfirmedTransactionStatusWithSignature, u32, /*slot index*/ )>, > { trace!( "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}", address ); self.stats.increment_num_queries(); let mut bigtable = self.connection.client(); let address_prefix = format!("{address}/"); // Figure out where to start listing from based on `before_signature` let (first_slot, before_transaction_index) = match before_signature { None => (Slot::MAX, 0), Some(before_signature) => { let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", before_signature.to_string()) .await?; (slot, index) } }; // Figure out where to end listing from based on `until_signature` let (last_slot, until_transaction_index) = match until_signature { None => (0, u32::MAX), Some(until_signature) => { let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", until_signature.to_string()) .await?; (slot, index) } }; let mut infos = vec![]; let starting_slot_tx_len = bigtable .get_protobuf_or_bincode_cell::, tx_by_addr::TransactionByAddr>( "tx-by-addr", format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)), ) .await .map(|cell_data| { match cell_data { bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(), bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(), } }) .unwrap_or(0); // Return the next tx-by-addr data of amount `limit` plus extra to account for the largest // number that might be flitered out let tx_by_addr_data = bigtable .get_row_data( "tx-by-addr", Some(format!( "{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot), )), Some(format!( "{}{}", address_prefix, slot_to_tx_by_addr_key(last_slot), )), limit as i64 + starting_slot_tx_len as i64, ) .await?; 'outer: for (row_key, data) in tx_by_addr_data { let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| { bigtable::Error::ObjectCorrupt(format!( "Failed to convert key to slot: tx-by-addr/{row_key}" )) })?; let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::< Vec, tx_by_addr::TransactionByAddr, >(&data, "tx-by-addr", row_key.clone())?; let mut cell_data: Vec = match deserialized_cell_data { bigtable::CellData::Bincode(tx_by_addr) => { tx_by_addr.into_iter().map(|legacy| legacy.into()).collect() } bigtable::CellData::Protobuf(tx_by_addr) => { tx_by_addr.try_into().map_err(|error| { bigtable::Error::ObjectCorrupt(format!( "Failed to deserialize: {}: tx-by-addr/{}", error, row_key.clone() )) })? } }; cell_data.reverse(); for tx_by_addr_info in cell_data.into_iter() { // Filter out records before `before_transaction_index` if slot == first_slot && tx_by_addr_info.index >= before_transaction_index { continue; } // Filter out records after `until_transaction_index` if slot == last_slot && tx_by_addr_info.index <= until_transaction_index { continue; } infos.push(( ConfirmedTransactionStatusWithSignature { signature: tx_by_addr_info.signature, slot, err: tx_by_addr_info.err, memo: tx_by_addr_info.memo, block_time: tx_by_addr_info.block_time, }, tx_by_addr_info.index, )); // Respect limit if infos.len() >= limit { break 'outer; } } } Ok(infos) } /// Upload a new confirmed block and associated meta data. pub async fn upload_confirmed_block( &self, slot: Slot, confirmed_block: VersionedConfirmedBlock, ) -> Result<()> { trace!( "LedgerStorage::upload_confirmed_block request received: {:?}", slot ); self.upload_confirmed_block_with_entries( slot, VersionedConfirmedBlockWithEntries { block: confirmed_block, entries: vec![], }, ) .await } pub async fn upload_confirmed_block_with_entries( &self, slot: Slot, confirmed_block: VersionedConfirmedBlockWithEntries, ) -> Result<()> { trace!( "LedgerStorage::upload_confirmed_block_with_entries request received: {:?}", slot ); let mut by_addr: HashMap<&Pubkey, Vec> = HashMap::new(); let VersionedConfirmedBlockWithEntries { block: confirmed_block, entries, } = confirmed_block; let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len()); for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta; let err = meta.status.clone().err(); let index = index as u32; let signature = transaction.signatures[0]; let memo = extract_and_fmt_memos(transaction_with_meta); for address in transaction_with_meta.account_keys().iter() { if !is_sysvar_id(address) { by_addr .entry(address) .or_default() .push(TransactionByAddrInfo { signature, err: err.clone(), index, memo: memo.clone(), block_time: confirmed_block.block_time, }); } } tx_cells.push(( signature.to_string(), TransactionInfo { slot, index, err, memo, }, )); } let tx_by_addr_cells: Vec<_> = by_addr .into_iter() .map(|(address, transaction_info_by_addr)| { ( format!("{}/{}", address, slot_to_tx_by_addr_key(slot)), tx_by_addr::TransactionByAddr { tx_by_addrs: transaction_info_by_addr .into_iter() .map(|by_addr| by_addr.into()) .collect(), }, ) }) .collect(); let num_entries = entries.len(); let entry_cell = ( slot_to_entries_key(slot), entries::Entries { entries: entries.into_iter().enumerate().map(Into::into).collect(), }, ); let mut tasks = vec![]; if !tx_cells.is_empty() { let conn = self.connection.clone(); tasks.push(tokio::spawn(async move { conn.put_bincode_cells_with_retry::("tx", &tx_cells) .await })); } if !tx_by_addr_cells.is_empty() { let conn = self.connection.clone(); tasks.push(tokio::spawn(async move { conn.put_protobuf_cells_with_retry::( "tx-by-addr", &tx_by_addr_cells, ) .await })); } if num_entries > 0 { let conn = self.connection.clone(); tasks.push(tokio::spawn(async move { conn.put_protobuf_cells_with_retry::("entries", &[entry_cell]) .await })); } let mut bytes_written = 0; let mut maybe_first_err: Option = None; let results = futures::future::join_all(tasks).await; for result in results { match result { Err(err) => { if maybe_first_err.is_none() { maybe_first_err = Some(Error::TokioJoinError(err)); } } Ok(Err(err)) => { if maybe_first_err.is_none() { maybe_first_err = Some(Error::BigTableError(err)); } } Ok(Ok(bytes)) => { bytes_written += bytes; } } } if let Some(err) = maybe_first_err { return Err(err); } let num_transactions = confirmed_block.transactions.len(); // Store the block itself last, after all other metadata about the block has been // successfully stored. This avoids partial uploaded blocks from becoming visible to // `get_confirmed_block()` and `get_confirmed_blocks()` let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())]; bytes_written += self .connection .put_protobuf_cells_with_retry::("blocks", &blocks_cells) .await?; datapoint_info!( "storage-bigtable-upload-block", ("slot", slot, i64), ("transactions", num_transactions, i64), ("entries", num_entries, i64), ("bytes", bytes_written, i64), ); Ok(()) } // Delete a confirmed block and associated meta data. pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> { let mut addresses: HashSet<&Pubkey> = HashSet::new(); let mut expected_tx_infos: HashMap = HashMap::new(); let confirmed_block = self.get_confirmed_block(slot).await?; for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { match transaction_with_meta { TransactionWithStatusMeta::MissingMetadata(transaction) => { let signature = transaction.signatures[0]; let index = index as u32; let err = None; for address in transaction.message.account_keys.iter() { if !is_sysvar_id(address) { addresses.insert(address); } } expected_tx_infos.insert( signature.to_string(), UploadedTransaction { slot, index, err }, ); } TransactionWithStatusMeta::Complete(tx_with_meta) => { let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta; let signature = transaction.signatures[0]; let index = index as u32; let err = meta.status.clone().err(); for address in tx_with_meta.account_keys().iter() { if !is_sysvar_id(address) { addresses.insert(address); } } expected_tx_infos.insert( signature.to_string(), UploadedTransaction { slot, index, err }, ); } } } let address_slot_rows: Vec<_> = addresses .into_iter() .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot))) .collect(); let tx_deletion_rows = if !expected_tx_infos.is_empty() { let signatures = expected_tx_infos.keys().cloned().collect::>(); let fetched_tx_infos: HashMap> = self.connection .get_bincode_cells_with_retry::("tx", &signatures) .await? .into_iter() .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into))) .collect::>(); let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len()); for (signature, expected_tx_info) in expected_tx_infos { match fetched_tx_infos.get(&signature) { Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => { deletion_rows.push(signature); } Some(Ok(fetched_tx_info)) => { warn!( "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}", signature, fetched_tx_info, &expected_tx_info, ); } Some(Err(err)) => { warn!( "skipped tx row {} because the bigtable entry was corrupted: {:?}", signature, err ); } None => { warn!("skipped tx row {} because it was not found", signature); } } } deletion_rows } else { vec![] }; let entries_exist = self .connection .client() .row_key_exists("entries", slot_to_entries_key(slot)) .await .is_ok_and(|x| x); if !dry_run { if !address_slot_rows.is_empty() { self.connection .delete_rows_with_retry("tx-by-addr", &address_slot_rows) .await?; } if !tx_deletion_rows.is_empty() { self.connection .delete_rows_with_retry("tx", &tx_deletion_rows) .await?; } if entries_exist { self.connection .delete_rows_with_retry("entries", &[slot_to_entries_key(slot)]) .await?; } self.connection .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)]) .await?; } info!( "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows, {} entry row", if dry_run { "[dry run] " } else { "" }, slot, tx_deletion_rows.len(), address_slot_rows.len(), if entries_exist { "with" } else {"WITHOUT"} ); Ok(()) } } #[cfg(test)] mod test { use super::*; #[test] fn test_slot_to_key() { assert_eq!(slot_to_key(0), "0000000000000000"); assert_eq!(slot_to_key(!0), "ffffffffffffffff"); } }