#![allow(clippy::integer_arithmetic)] use { log::*, serde::{Deserialize, Serialize}, solana_metrics::inc_new_counter_debug, solana_sdk::{ clock::{Slot, UnixTimestamp}, deserialize_utils::default_on_eof, message::v0::LoadedAddresses, pubkey::Pubkey, signature::Signature, sysvar::is_sysvar_id, transaction::{TransactionError, VersionedTransaction}, }, solana_storage_proto::convert::{generated, tx_by_addr}, solana_transaction_status::{ extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta, Reward, TransactionByAddrInfo, TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta, TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta, }, std::{ collections::{HashMap, HashSet}, convert::TryInto, }, thiserror::Error, }; #[macro_use] extern crate solana_metrics; #[macro_use] extern crate serde_derive; mod access_token; mod bigtable; mod compression; mod root_ca_certificate; #[derive(Debug, Error)] pub enum Error { #[error("BigTable: {0}")] BigTableError(bigtable::Error), #[error("I/O Error: {0}")] IoError(std::io::Error), #[error("Transaction encoded is not supported")] UnsupportedTransactionEncoding, #[error("Block not found: {0}")] BlockNotFound(Slot), #[error("Signature not found")] SignatureNotFound, } impl std::convert::From for Error { fn from(err: bigtable::Error) -> Self { Self::BigTableError(err) } } impl std::convert::From for Error { fn from(err: std::io::Error) -> Self { Self::IoError(err) } } pub type Result = std::result::Result; // Convert a slot to its bucket representation whereby lower slots are always lexically ordered // before higher slots fn slot_to_key(slot: Slot) -> String { format!("{:016x}", slot) } fn slot_to_blocks_key(slot: Slot) -> String { slot_to_key(slot) } fn slot_to_tx_by_addr_key(slot: Slot) -> String { slot_to_key(!slot) } // Reverse of `slot_to_key` fn key_to_slot(key: &str) -> Option { match Slot::from_str_radix(key, 16) { Ok(slot) => Some(slot), Err(err) => { // bucket data is probably corrupt warn!("Failed to parse object key as a slot: {}: {}", key, err); None } } } // A serialized `StoredConfirmedBlock` is stored in the `block` table // // StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids // some serde JSON directives that cause issues with bincode // // Note: in order to continue to support old bincode-serialized bigtable entries, if new fields are // added to ConfirmedBlock, they must either be excluded or set to `default_on_eof` here // #[derive(Serialize, Deserialize)] struct StoredConfirmedBlock { previous_blockhash: String, blockhash: String, parent_slot: Slot, transactions: Vec, rewards: StoredConfirmedBlockRewards, block_time: Option, #[serde(deserialize_with = "default_on_eof")] block_height: Option, } #[cfg(test)] impl From for StoredConfirmedBlock { fn from(confirmed_block: ConfirmedBlock) -> Self { let ConfirmedBlock { previous_blockhash, blockhash, parent_slot, transactions, rewards, block_time, block_height, } = confirmed_block; Self { previous_blockhash, blockhash, parent_slot, transactions: transactions.into_iter().map(|tx| tx.into()).collect(), rewards: rewards.into_iter().map(|reward| reward.into()).collect(), block_time, block_height, } } } impl From for ConfirmedBlock { fn from(confirmed_block: StoredConfirmedBlock) -> Self { let StoredConfirmedBlock { previous_blockhash, blockhash, parent_slot, transactions, rewards, block_time, block_height, } = confirmed_block; Self { previous_blockhash, blockhash, parent_slot, transactions: transactions.into_iter().map(|tx| tx.into()).collect(), rewards: rewards.into_iter().map(|reward| reward.into()).collect(), block_time, block_height, } } } #[derive(Serialize, Deserialize)] struct StoredConfirmedBlockTransaction { transaction: VersionedTransaction, meta: Option, } #[cfg(test)] impl From for StoredConfirmedBlockTransaction { fn from(value: TransactionWithStatusMeta) -> Self { match value { TransactionWithStatusMeta::MissingMetadata(transaction) => Self { transaction: VersionedTransaction::from(transaction), meta: None, }, TransactionWithStatusMeta::Complete(VersionedTransactionWithStatusMeta { transaction, meta, }) => Self { transaction, meta: Some(meta.into()), }, } } } impl From for TransactionWithStatusMeta { fn from(tx_with_meta: StoredConfirmedBlockTransaction) -> Self { let StoredConfirmedBlockTransaction { transaction, meta } = tx_with_meta; match meta { None => Self::MissingMetadata( transaction .into_legacy_transaction() .expect("versioned transactions always have meta"), ), Some(meta) => Self::Complete(VersionedTransactionWithStatusMeta { transaction, meta: meta.into(), }), } } } #[derive(Serialize, Deserialize)] struct StoredConfirmedBlockTransactionStatusMeta { err: Option, fee: u64, pre_balances: Vec, post_balances: Vec, } impl From for TransactionStatusMeta { fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self { let StoredConfirmedBlockTransactionStatusMeta { err, fee, pre_balances, post_balances, } = value; let status = match &err { None => Ok(()), Some(err) => Err(err.clone()), }; Self { status, fee, pre_balances, post_balances, inner_instructions: None, log_messages: None, pre_token_balances: None, post_token_balances: None, rewards: None, loaded_addresses: LoadedAddresses::default(), } } } impl From for StoredConfirmedBlockTransactionStatusMeta { fn from(value: TransactionStatusMeta) -> Self { let TransactionStatusMeta { status, fee, pre_balances, post_balances, .. } = value; Self { err: status.err(), fee, pre_balances, post_balances, } } } type StoredConfirmedBlockRewards = Vec; #[derive(Serialize, Deserialize)] struct StoredConfirmedBlockReward { pubkey: String, lamports: i64, } impl From for Reward { fn from(value: StoredConfirmedBlockReward) -> Self { let StoredConfirmedBlockReward { pubkey, lamports } = value; Self { pubkey, lamports, post_balance: 0, reward_type: None, commission: None, } } } impl From for StoredConfirmedBlockReward { fn from(value: Reward) -> Self { let Reward { pubkey, lamports, .. } = value; Self { pubkey, lamports } } } // A serialized `TransactionInfo` is stored in the `tx` table #[derive(Serialize, Deserialize, PartialEq, Debug)] struct TransactionInfo { slot: Slot, // The slot that contains the block with this transaction in it index: u32, // Where the transaction is located in the block err: Option, // None if the transaction executed successfully memo: Option, // Transaction memo } // Part of a serialized `TransactionInfo` which is stored in the `tx` table #[derive(PartialEq, Debug)] struct UploadedTransaction { slot: Slot, // The slot that contains the block with this transaction in it index: u32, // Where the transaction is located in the block err: Option, // None if the transaction executed successfully } impl From for UploadedTransaction { fn from(transaction_info: TransactionInfo) -> Self { Self { slot: transaction_info.slot, index: transaction_info.index, err: transaction_info.err, } } } impl From for TransactionStatus { fn from(transaction_info: TransactionInfo) -> Self { let TransactionInfo { slot, err, .. } = transaction_info; let status = match &err { None => Ok(()), Some(err) => Err(err.clone()), }; Self { slot, confirmations: None, status, err, confirmation_status: Some(TransactionConfirmationStatus::Finalized), } } } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] struct LegacyTransactionByAddrInfo { pub signature: Signature, // The transaction signature pub err: Option, // None if the transaction executed successfully pub index: u32, // Where the transaction is located in the block pub memo: Option, // Transaction memo } impl From for TransactionByAddrInfo { fn from(legacy: LegacyTransactionByAddrInfo) -> Self { let LegacyTransactionByAddrInfo { signature, err, index, memo, } = legacy; Self { signature, err, index, memo, block_time: None, } } } #[derive(Clone)] pub struct LedgerStorage { connection: bigtable::BigTableConnection, } impl LedgerStorage { pub async fn new( read_only: bool, timeout: Option, credential_path: Option, ) -> Result { let connection = bigtable::BigTableConnection::new("solana-ledger", read_only, timeout, credential_path) .await?; Ok(Self { connection }) } /// Return the available slot that contains a block pub async fn get_first_available_block(&self) -> Result> { debug!("LedgerStorage::get_first_available_block request received"); inc_new_counter_debug!("storage-bigtable-query", 1); let mut bigtable = self.connection.client(); let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?; if blocks.is_empty() { return Ok(None); } Ok(key_to_slot(&blocks[0])) } /// Fetch the next slots after the provided slot that contains a block /// /// start_slot: slot to start the search from (inclusive) /// limit: stop after this many slots have been found; if limit==0, all records in the table /// after start_slot will be read pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result> { debug!( "LedgerStorage::get_confirmed_blocks request received: {:?} {:?}", start_slot, limit ); inc_new_counter_debug!("storage-bigtable-query", 1); let mut bigtable = self.connection.client(); let blocks = bigtable .get_row_keys( "blocks", Some(slot_to_blocks_key(start_slot)), None, limit as i64, ) .await?; Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect()) } /// Fetch the confirmed block from the desired slot pub async fn get_confirmed_block(&self, slot: Slot) -> Result { debug!( "LedgerStorage::get_confirmed_block request received: {:?}", slot ); inc_new_counter_debug!("storage-bigtable-query", 1); let mut bigtable = self.connection.client(); let block_cell_data = bigtable .get_protobuf_or_bincode_cell::( "blocks", slot_to_blocks_key(slot), ) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::BlockNotFound(slot), _ => err.into(), })?; Ok(match block_cell_data { bigtable::CellData::Bincode(block) => block.into(), bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| { bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot))) })?, }) } pub async fn get_signature_status(&self, signature: &Signature) -> Result { debug!( "LedgerStorage::get_signature_status request received: {:?}", signature ); inc_new_counter_debug!("storage-bigtable-query", 1); let mut bigtable = self.connection.client(); let transaction_info = bigtable .get_bincode_cell::("tx", signature.to_string()) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::SignatureNotFound, _ => err.into(), })?; Ok(transaction_info.into()) } /// Fetch a confirmed transaction pub async fn get_confirmed_transaction( &self, signature: &Signature, ) -> Result> { debug!( "LedgerStorage::get_confirmed_transaction request received: {:?}", signature ); inc_new_counter_debug!("storage-bigtable-query", 1); let mut bigtable = self.connection.client(); // Figure out which block the transaction is located in let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", signature.to_string()) .await .map_err(|err| match err { bigtable::Error::RowNotFound => Error::SignatureNotFound, _ => err.into(), })?; // Load the block and return the transaction let block = self.get_confirmed_block(slot).await?; match block.transactions.into_iter().nth(index as usize) { None => { // report this somewhere actionable? warn!("Transaction info for {} is corrupt", signature); Ok(None) } Some(tx_with_meta) => { if tx_with_meta.transaction_signature() != signature { warn!( "Transaction info or confirmed block for {} is corrupt", signature ); Ok(None) } else { Ok(Some(ConfirmedTransactionWithStatusMeta { slot, tx_with_meta, block_time: block.block_time, })) } } } } /// Get confirmed signatures for the provided address, in descending ledger order /// /// address: address to search for /// before_signature: start with the first signature older than this one /// until_signature: end with the last signature more recent than this one /// limit: stop after this many signatures; if limit==0, all records in the table will be read pub async fn get_confirmed_signatures_for_address( &self, address: &Pubkey, before_signature: Option<&Signature>, until_signature: Option<&Signature>, limit: usize, ) -> Result< Vec<( ConfirmedTransactionStatusWithSignature, u32, /*slot index*/ )>, > { debug!( "LedgerStorage::get_confirmed_signatures_for_address request received: {:?}", address ); inc_new_counter_debug!("storage-bigtable-query", 1); let mut bigtable = self.connection.client(); let address_prefix = format!("{}/", address); // Figure out where to start listing from based on `before_signature` let (first_slot, before_transaction_index) = match before_signature { None => (Slot::MAX, 0), Some(before_signature) => { let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", before_signature.to_string()) .await?; (slot, index) } }; // Figure out where to end listing from based on `until_signature` let (last_slot, until_transaction_index) = match until_signature { None => (0, u32::MAX), Some(until_signature) => { let TransactionInfo { slot, index, .. } = bigtable .get_bincode_cell("tx", until_signature.to_string()) .await?; (slot, index) } }; let mut infos = vec![]; let starting_slot_tx_len = bigtable .get_protobuf_or_bincode_cell::, tx_by_addr::TransactionByAddr>( "tx-by-addr", format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)), ) .await .map(|cell_data| { match cell_data { bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(), bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(), } }) .unwrap_or(0); // Return the next tx-by-addr data of amount `limit` plus extra to account for the largest // number that might be flitered out let tx_by_addr_data = bigtable .get_row_data( "tx-by-addr", Some(format!( "{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot), )), Some(format!( "{}{}", address_prefix, slot_to_tx_by_addr_key(last_slot), )), limit as i64 + starting_slot_tx_len as i64, ) .await?; 'outer: for (row_key, data) in tx_by_addr_data { let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| { bigtable::Error::ObjectCorrupt(format!( "Failed to convert key to slot: tx-by-addr/{}", row_key )) })?; let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::< Vec, tx_by_addr::TransactionByAddr, >(&data, "tx-by-addr", row_key.clone())?; let mut cell_data: Vec = match deserialized_cell_data { bigtable::CellData::Bincode(tx_by_addr) => { tx_by_addr.into_iter().map(|legacy| legacy.into()).collect() } bigtable::CellData::Protobuf(tx_by_addr) => { tx_by_addr.try_into().map_err(|error| { bigtable::Error::ObjectCorrupt(format!( "Failed to deserialize: {}: tx-by-addr/{}", error, row_key.clone() )) })? } }; cell_data.reverse(); for tx_by_addr_info in cell_data.into_iter() { // Filter out records before `before_transaction_index` if slot == first_slot && tx_by_addr_info.index >= before_transaction_index { continue; } // Filter out records after `until_transaction_index` if slot == last_slot && tx_by_addr_info.index <= until_transaction_index { continue; } infos.push(( ConfirmedTransactionStatusWithSignature { signature: tx_by_addr_info.signature, slot, err: tx_by_addr_info.err, memo: tx_by_addr_info.memo, block_time: tx_by_addr_info.block_time, }, tx_by_addr_info.index, )); // Respect limit if infos.len() >= limit { break 'outer; } } } Ok(infos) } // Upload a new confirmed block and associated meta data. pub async fn upload_confirmed_block( &self, slot: Slot, confirmed_block: VersionedConfirmedBlock, ) -> Result<()> { let mut bytes_written = 0; let mut by_addr: HashMap<&Pubkey, Vec> = HashMap::new(); let mut tx_cells = vec![]; for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { let VersionedTransactionWithStatusMeta { meta, transaction } = transaction_with_meta; let err = meta.status.clone().err(); let index = index as u32; let signature = transaction.signatures[0]; let memo = extract_and_fmt_memos(transaction_with_meta); for address in transaction_with_meta.account_keys().iter() { if !is_sysvar_id(address) { by_addr .entry(address) .or_default() .push(TransactionByAddrInfo { signature, err: err.clone(), index, memo: memo.clone(), block_time: confirmed_block.block_time, }); } } tx_cells.push(( signature.to_string(), TransactionInfo { slot, index, err, memo, }, )); } let tx_by_addr_cells: Vec<_> = by_addr .into_iter() .map(|(address, transaction_info_by_addr)| { ( format!("{}/{}", address, slot_to_tx_by_addr_key(slot)), tx_by_addr::TransactionByAddr { tx_by_addrs: transaction_info_by_addr .into_iter() .map(|by_addr| by_addr.into()) .collect(), }, ) }) .collect(); if !tx_cells.is_empty() { bytes_written += self .connection .put_bincode_cells_with_retry::("tx", &tx_cells) .await?; } if !tx_by_addr_cells.is_empty() { bytes_written += self .connection .put_protobuf_cells_with_retry::( "tx-by-addr", &tx_by_addr_cells, ) .await?; } let num_transactions = confirmed_block.transactions.len(); // Store the block itself last, after all other metadata about the block has been // successfully stored. This avoids partial uploaded blocks from becoming visible to // `get_confirmed_block()` and `get_confirmed_blocks()` let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())]; bytes_written += self .connection .put_protobuf_cells_with_retry::("blocks", &blocks_cells) .await?; info!( "uploaded block for slot {}: {} transactions, {} bytes", slot, num_transactions, bytes_written ); Ok(()) } // Delete a confirmed block and associated meta data. pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> { let mut addresses: HashSet<&Pubkey> = HashSet::new(); let mut expected_tx_infos: HashMap = HashMap::new(); let confirmed_block = self.get_confirmed_block(slot).await?; for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { match transaction_with_meta { TransactionWithStatusMeta::MissingMetadata(transaction) => { let signature = transaction.signatures[0]; let index = index as u32; let err = None; for address in transaction.message.account_keys.iter() { if !is_sysvar_id(address) { addresses.insert(address); } } expected_tx_infos.insert( signature.to_string(), UploadedTransaction { slot, index, err }, ); } TransactionWithStatusMeta::Complete(tx_with_meta) => { let VersionedTransactionWithStatusMeta { transaction, meta } = tx_with_meta; let signature = transaction.signatures[0]; let index = index as u32; let err = meta.status.clone().err(); for address in tx_with_meta.account_keys().iter() { if !is_sysvar_id(address) { addresses.insert(address); } } expected_tx_infos.insert( signature.to_string(), UploadedTransaction { slot, index, err }, ); } } } let address_slot_rows: Vec<_> = addresses .into_iter() .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot))) .collect(); let tx_deletion_rows = if !expected_tx_infos.is_empty() { let signatures = expected_tx_infos .iter() .map(|(signature, _info)| signature) .cloned() .collect::>(); let fetched_tx_infos: HashMap> = self.connection .get_bincode_cells_with_retry::("tx", &signatures) .await? .into_iter() .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into))) .collect::>(); let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len()); for (signature, expected_tx_info) in expected_tx_infos { match fetched_tx_infos.get(&signature) { Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => { deletion_rows.push(signature); } Some(Ok(fetched_tx_info)) => { warn!( "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}", signature, fetched_tx_info, &expected_tx_info, ); } Some(Err(err)) => { warn!( "skipped tx row {} because the bigtable entry was corrupted: {:?}", signature, err ); } None => { warn!("skipped tx row {} because it was not found", signature); } } } deletion_rows } else { vec![] }; if !dry_run { if !address_slot_rows.is_empty() { self.connection .delete_rows_with_retry("tx-by-addr", &address_slot_rows) .await?; } if !tx_deletion_rows.is_empty() { self.connection .delete_rows_with_retry("tx", &tx_deletion_rows) .await?; } self.connection .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)]) .await?; } info!( "{}deleted ledger data for slot {}: {} transaction rows, {} address slot rows", if dry_run { "[dry run] " } else { "" }, slot, tx_deletion_rows.len(), address_slot_rows.len() ); Ok(()) } } #[cfg(test)] mod test { use super::*; #[test] fn test_slot_to_key() { assert_eq!(slot_to_key(0), "0000000000000000"); assert_eq!(slot_to_key(!0), "ffffffffffffffff"); } }