diff --git a/storage-bigtable/Cargo.toml b/storage-bigtable/Cargo.toml index 4b769cc8fd..d1635d3422 100644 --- a/storage-bigtable/Cargo.toml +++ b/storage-bigtable/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "solana-storage-bigtable" -version = "1.1.22" +version = "1.4.0" description = "Solana Storage BigTable" authors = ["Solana Maintainers "] repository = "https://github.com/solana-labs/solana" @@ -9,18 +9,23 @@ homepage = "https://solana.com/" edition = "2018" [dependencies] +backoff = {version="0.2.1", features = ["tokio"]} bincode = "1.2.1" +bzip2 = "0.3.3" +enum-iterator = "0.6.0" +flate2 = "1.0.14" goauth = "0.7.2" log = "0.4.8" -smpl_jwt = "0.5.0" -tonic = {version="0.3.0", features = ["tls", "transport"]} prost = "0.6.1" prost-types = "0.6.1" -enum-iterator = "0.6.0" -bzip2 = "0.3.3" -flate2 = "1.0.14" serde = "1.0.112" serde_derive = "1.0.103" +smpl_jwt = "0.5.0" +solana-sdk = { path = "../sdk", version = "1.1.20" } +solana-transaction-status = { path = "../transaction-status", version = "1.1.20" } +thiserror = "1.0" +futures = "0.3.5" +tonic = {version="0.3.0", features = ["tls", "transport"]} zstd = "0.5.1" [lib] diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs new file mode 100644 index 0000000000..675fca3003 --- /dev/null +++ b/storage-bigtable/src/bigtable.rs @@ -0,0 +1,471 @@ +// Primitives for reading/writing BigTable tables + +use crate::access_token::{AccessToken, Scope}; +use crate::compression::{compress_best, decompress}; +use crate::root_ca_certificate; +use log::*; +use std::sync::{Arc, RwLock}; +use thiserror::Error; +use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request}; + +mod google { + mod rpc { + include!(concat!( + env!("CARGO_MANIFEST_DIR"), + concat!("/proto/google.rpc.rs") + )); + } + pub mod bigtable { + pub mod v2 { + include!(concat!( + env!("CARGO_MANIFEST_DIR"), + concat!("/proto/google.bigtable.v2.rs") + )); + } + } +} +use google::bigtable::v2::*; + +pub type RowKey = String; +pub type CellName = String; +pub type CellValue = Vec; +pub type RowData = Vec<(CellName, CellValue)>; + +#[derive(Debug, Error)] +pub enum Error { + #[error("AccessToken error: {0}")] + AccessTokenError(String), + + #[error("Certificate error: {0}")] + CertificateError(String), + + #[error("I/O Error: {0}")] + IoError(std::io::Error), + + #[error("Transport error: {0}")] + TransportError(tonic::transport::Error), + + #[error("Invalid URI {0}: {1}")] + InvalidUri(String, String), + + #[error("Row not found")] + RowNotFound, + + #[error("Row write failed")] + RowWriteFailed, + + #[error("Object not found: {0}")] + ObjectNotFound(String), + + #[error("Object is corrupt: {0}")] + ObjectCorrupt(String), + + #[error("RPC error: {0}")] + RpcError(tonic::Status), +} + +impl std::convert::From for Error { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} + +impl std::convert::From for Error { + fn from(err: tonic::transport::Error) -> Self { + Self::TransportError(err) + } +} + +impl std::convert::From for Error { + fn from(err: tonic::Status) -> Self { + Self::RpcError(err) + } +} + +pub type Result = std::result::Result; + +#[derive(Clone)] +pub struct BigTableConnection { + access_token: Option>>, + channel: tonic::transport::Channel, + table_prefix: String, +} + +impl BigTableConnection { + /// Establish a connection to the BigTable instance named `instance_name`. If read-only access + /// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope. + /// + /// The GOOGLE_APPLICATION_CREDENTIALS environment variable will be used to determine the + /// program name that contains the BigTable instance in addition to access credentials. + /// + /// The BIGTABLE_EMULATOR_HOST environment variable is also respected. + /// + pub async fn new(instance_name: &str, read_only: bool) -> Result { + match std::env::var("BIGTABLE_EMULATOR_HOST") { + Ok(endpoint) => { + info!("Connecting to bigtable emulator at {}", endpoint); + + Ok(Self { + access_token: None, + channel: tonic::transport::Channel::from_shared(format!("http://{}", endpoint)) + .map_err(|err| Error::InvalidUri(endpoint, err.to_string()))? + .connect_lazy()?, + table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name), + }) + } + + Err(_) => { + let mut access_token = AccessToken::new(if read_only { + &Scope::BigTableDataReadOnly + } else { + &Scope::BigTableData + }) + .map_err(Error::AccessTokenError)?; + + access_token.refresh().await; + let table_prefix = format!( + "projects/{}/instances/{}/tables/", + access_token.project(), + instance_name + ); + + Ok(Self { + access_token: Some(Arc::new(RwLock::new(access_token))), + channel: tonic::transport::Channel::from_static( + "https://bigtable.googleapis.com", + ) + .tls_config( + ClientTlsConfig::new() + .ca_certificate( + root_ca_certificate::load().map_err(Error::CertificateError)?, + ) + .domain_name("bigtable.googleapis.com"), + )? + .connect_lazy()?, + table_prefix, + }) + } + } + } + + /// Create a new BigTable client. + /// + /// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however + /// creating new clients is cheap and thus can be used as a work around for ease of use. + pub fn client(&self) -> BigTable { + let client = { + if let Some(ref access_token) = self.access_token { + let access_token = access_token.clone(); + bigtable_client::BigtableClient::with_interceptor( + self.channel.clone(), + move |mut req: Request<()>| { + match access_token.read().unwrap().get() { + Ok(access_token) => match MetadataValue::from_str(&access_token) { + Ok(authorization_header) => { + req.metadata_mut() + .insert("authorization", authorization_header); + } + Err(err) => warn!("Failed to set authorization header: {}", err), + }, + Err(err) => warn!("{}", err), + } + Ok(req) + }, + ) + } else { + bigtable_client::BigtableClient::new(self.channel.clone()) + } + }; + BigTable { + access_token: self.access_token.clone(), + client, + table_prefix: self.table_prefix.clone(), + } + } + + pub async fn put_bincode_cells_with_retry( + &self, + table: &str, + cells: &[(RowKey, T)], + ) -> Result + where + T: serde::ser::Serialize, + { + use backoff::{future::FutureOperation as _, ExponentialBackoff}; + (|| async { + let mut client = self.client(); + Ok(client.put_bincode_cells(table, cells).await?) + }) + .retry(ExponentialBackoff::default()) + .await + } +} + +pub struct BigTable { + access_token: Option>>, + client: bigtable_client::BigtableClient, + table_prefix: String, +} + +impl BigTable { + async fn decode_read_rows_response( + mut rrr: tonic::codec::Streaming, + ) -> Result> { + let mut rows: Vec<(RowKey, RowData)> = vec![]; + + let mut row_key = None; + let mut row_data = vec![]; + + let mut cell_name = None; + let mut cell_timestamp = 0; + let mut cell_value = vec![]; + let mut cell_version_ok = true; + while let Some(res) = rrr.message().await? { + for (i, mut chunk) in res.chunks.into_iter().enumerate() { + // The comments for `read_rows_response::CellChunk` provide essential details for + // understanding how the below decoding works... + trace!("chunk {}: {:?}", i, chunk); + + // Starting a new row? + if !chunk.row_key.is_empty() { + row_key = String::from_utf8(chunk.row_key).ok(); // Require UTF-8 for row keys + } + + // Starting a new cell? + if let Some(qualifier) = chunk.qualifier { + if let Some(cell_name) = cell_name { + row_data.push((cell_name, cell_value)); + cell_value = vec![]; + } + cell_name = String::from_utf8(qualifier).ok(); // Require UTF-8 for cell names + cell_timestamp = chunk.timestamp_micros; + cell_version_ok = true; + } else { + // Continuing the existing cell. Check if this is the start of another version of the cell + if chunk.timestamp_micros != 0 { + if chunk.timestamp_micros < cell_timestamp { + cell_version_ok = false; // ignore older versions of the cell + } else { + // newer version of the cell, remove the older cell + cell_version_ok = true; + cell_value = vec![]; + cell_timestamp = chunk.timestamp_micros; + } + } + } + if cell_version_ok { + cell_value.append(&mut chunk.value); + } + + // End of a row? + if chunk.row_status.is_some() { + if let Some(read_rows_response::cell_chunk::RowStatus::CommitRow(_)) = + chunk.row_status + { + if let Some(cell_name) = cell_name { + row_data.push((cell_name, cell_value)); + } + + if let Some(row_key) = row_key { + rows.push((row_key, row_data)) + } + } + + row_key = None; + row_data = vec![]; + cell_value = vec![]; + cell_name = None; + } + } + } + Ok(rows) + } + + async fn refresh_access_token(&self) { + if let Some(ref access_token) = self.access_token { + access_token.write().unwrap().refresh().await; + } + } + + /// Get `table` row keys in lexical order. + /// + /// If `start_at` is provided, the row key listing will start with key. + /// Otherwise the listing will start from the start of the table. + pub async fn get_row_keys( + &mut self, + table_name: &str, + start_at: Option, + rows_limit: i64, + ) -> Result> { + self.refresh_access_token().await; + + let response = self + .client + .read_rows(ReadRowsRequest { + table_name: format!("{}{}", self.table_prefix, table_name), + rows_limit, + rows: Some(RowSet { + row_keys: vec![], + row_ranges: if let Some(row_key) = start_at { + vec![RowRange { + start_key: Some(row_range::StartKey::StartKeyClosed( + row_key.into_bytes(), + )), + end_key: None, + }] + } else { + vec![] + }, + }), + filter: Some(RowFilter { + filter: Some(row_filter::Filter::Chain(row_filter::Chain { + filters: vec![ + RowFilter { + // Return minimal number of cells + filter: Some(row_filter::Filter::CellsPerRowLimitFilter(1)), + }, + RowFilter { + // Only return the latest version of each cell + filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)), + }, + RowFilter { + // Strip the cell values + filter: Some(row_filter::Filter::StripValueTransformer(true)), + }, + ], + })), + }), + ..ReadRowsRequest::default() + }) + .await? + .into_inner(); + + let rows = Self::decode_read_rows_response(response).await?; + Ok(rows.into_iter().map(|r| r.0).collect()) + } + + /// Get latest data from `limit` rows of `table`, starting inclusively at the `row_key` row. + /// + /// All column families are accepted, and only the latest version of each column cell will be + /// returned. + pub async fn get_row_data(&mut self, table_name: &str, row_key: RowKey) -> Result { + self.refresh_access_token().await; + + let response = self + .client + .read_rows(ReadRowsRequest { + table_name: format!("{}{}", self.table_prefix, table_name), + rows_limit: 1, + rows: Some(RowSet { + row_keys: vec![row_key.into_bytes()], + row_ranges: vec![], + }), + filter: Some(RowFilter { + // Only return the latest version of each cell + filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)), + }), + ..ReadRowsRequest::default() + }) + .await? + .into_inner(); + + let rows = Self::decode_read_rows_response(response).await?; + rows.into_iter() + .next() + .map(|r| r.1) + .ok_or_else(|| Error::RowNotFound) + } + + /// Store data for one or more `table` rows in the `family_name` Column family + async fn put_row_data( + &mut self, + table_name: &str, + family_name: &str, + row_data: &[(&RowKey, RowData)], + ) -> Result<()> { + self.refresh_access_token().await; + + let mut entries = vec![]; + for (row_key, row_data) in row_data { + let mutations = row_data + .iter() + .map(|(column_key, column_value)| Mutation { + mutation: Some(mutation::Mutation::SetCell(mutation::SetCell { + family_name: family_name.to_string(), + column_qualifier: column_key.clone().into_bytes(), + timestamp_micros: -1, // server assigned + value: column_value.to_vec(), + })), + }) + .collect(); + + entries.push(mutate_rows_request::Entry { + row_key: (*row_key).clone().into_bytes(), + mutations, + }); + } + + let mut response = self + .client + .mutate_rows(MutateRowsRequest { + table_name: format!("{}{}", self.table_prefix, table_name), + entries, + ..MutateRowsRequest::default() + }) + .await? + .into_inner(); + + while let Some(res) = response.message().await? { + for entry in res.entries { + if let Some(status) = entry.status { + if status.code != 0 { + eprintln!("put_row_data error {}: {}", status.code, status.message); + warn!("put_row_data error {}: {}", status.code, status.message); + return Err(Error::RowWriteFailed); + } + } + } + } + + Ok(()) + } + + pub async fn get_bincode_cell(&mut self, table: &str, key: RowKey) -> Result + where + T: serde::de::DeserializeOwned, + { + let row_data = self.get_row_data(table, key.clone()).await?; + + let value = row_data + .into_iter() + .find(|(name, _)| name == "bin") + .ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))? + .1; + + let data = decompress(&value)?; + bincode::deserialize(&data).map_err(|err| { + warn!("Failed to deserialize {}/{}: {}", table, key, err); + Error::ObjectCorrupt(format!("{}/{}", table, key)) + }) + } + + pub async fn put_bincode_cells( + &mut self, + table: &str, + cells: &[(RowKey, T)], + ) -> Result + where + T: serde::ser::Serialize, + { + let mut bytes_written = 0; + let mut new_row_data = vec![]; + for (row_key, data) in cells { + let data = compress_best(&bincode::serialize(&data).unwrap())?; + bytes_written += data.len(); + new_row_data.push((row_key, vec![("bin".to_string(), data)])); + } + + self.put_row_data(table, "x", &new_row_data).await?; + Ok(bytes_written) + } +} diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index a441dace1c..b921cecc0e 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -1,6 +1,536 @@ -mod access_token; -mod compression; -mod root_ca_certificate; +use log::*; +use serde::{Deserialize, Serialize}; +use solana_sdk::{ + clock::{Slot, UnixTimestamp}, + pubkey::Pubkey, + signature::Signature, + sysvar::is_sysvar_id, + transaction::{Transaction, TransactionError}, +}; +use solana_transaction_status::{ + ConfirmedBlock, ConfirmedTransaction, EncodedTransaction, Rewards, TransactionStatus, + TransactionWithStatusMeta, UiTransactionEncoding, UiTransactionStatusMeta, +}; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, +}; +use thiserror::Error; #[macro_use] extern crate serde_derive; + +mod access_token; +mod bigtable; +mod compression; +mod root_ca_certificate; + +#[derive(Debug, Error)] +pub enum Error { + #[error("BigTable: {0}")] + BigTableError(bigtable::Error), + + #[error("I/O Error: {0}")] + IoError(std::io::Error), + + #[error("Transaction encoded is not supported")] + UnsupportedTransactionEncoding, + + #[error("Block not found: {0}")] + BlockNotFound(Slot), + + #[error("Signature not found")] + SignatureNotFound, +} + +impl std::convert::From for Error { + fn from(err: bigtable::Error) -> Self { + Self::BigTableError(err) + } +} + +impl std::convert::From for Error { + fn from(err: std::io::Error) -> Self { + Self::IoError(err) + } +} + +pub type Result = std::result::Result; + +// Convert a slot to its bucket representation whereby lower slots are always lexically ordered +// before higher slots +fn slot_to_key(slot: Slot) -> String { + format!("{:016x}", slot) +} + +// Reverse of `slot_to_key` +fn key_to_slot(key: &str) -> Option { + match Slot::from_str_radix(key, 16) { + Ok(slot) => Some(slot), + Err(err) => { + // bucket data is probably corrupt + warn!("Failed to parse object key as a slot: {}: {}", key, err); + None + } + } +} + +// A serialized `StoredConfirmedBlock` is stored in the `block` table +// +// StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids +// some serde JSON directives that cause issues with bincode +// +#[derive(Serialize, Deserialize)] +struct StoredConfirmedBlock { + previous_blockhash: String, + blockhash: String, + parent_slot: Slot, + transactions: Vec, + rewards: Rewards, + block_time: Option, +} + +impl StoredConfirmedBlock { + fn into_confirmed_block(self, encoding: UiTransactionEncoding) -> ConfirmedBlock { + let StoredConfirmedBlock { + previous_blockhash, + blockhash, + parent_slot, + transactions, + rewards, + block_time, + } = self; + + ConfirmedBlock { + previous_blockhash, + blockhash, + parent_slot, + transactions: transactions + .into_iter() + .map(|transaction| transaction.into_transaction_with_status_meta(encoding)) + .collect(), + rewards, + block_time, + } + } +} + +impl TryFrom for StoredConfirmedBlock { + type Error = Error; + + fn try_from(confirmed_block: ConfirmedBlock) -> Result { + let ConfirmedBlock { + previous_blockhash, + blockhash, + parent_slot, + transactions, + rewards, + block_time, + } = confirmed_block; + + let mut encoded_transactions = vec![]; + for transaction in transactions.into_iter() { + encoded_transactions.push(transaction.try_into()?); + } + + Ok(Self { + previous_blockhash, + blockhash, + parent_slot, + transactions: encoded_transactions, + rewards, + block_time, + }) + } +} + +#[derive(Serialize, Deserialize)] +struct StoredConfirmedBlockTransaction { + transaction: Transaction, + meta: Option, +} + +impl StoredConfirmedBlockTransaction { + fn into_transaction_with_status_meta( + self, + encoding: UiTransactionEncoding, + ) -> TransactionWithStatusMeta { + let StoredConfirmedBlockTransaction { transaction, meta } = self; + TransactionWithStatusMeta { + transaction: EncodedTransaction::encode(transaction, encoding), + meta: meta.map(|meta| meta.into()), + } + } +} + +impl TryFrom for StoredConfirmedBlockTransaction { + type Error = Error; + + fn try_from(value: TransactionWithStatusMeta) -> Result { + let TransactionWithStatusMeta { transaction, meta } = value; + + Ok(Self { + transaction: transaction + .decode() + .ok_or(Error::UnsupportedTransactionEncoding)?, + meta: meta.map(|meta| meta.into()), + }) + } +} + +#[derive(Serialize, Deserialize)] +struct StoredConfirmedBlockTransactionStatusMeta { + err: Option, + fee: u64, + pre_balances: Vec, + post_balances: Vec, +} + +impl From for UiTransactionStatusMeta { + fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self { + let StoredConfirmedBlockTransactionStatusMeta { + err, + fee, + pre_balances, + post_balances, + } = value; + let status = match &err { + None => Ok(()), + Some(err) => Err(err.clone()), + }; + Self { + err, + status, + fee, + pre_balances, + post_balances, + } + } +} + +impl From for StoredConfirmedBlockTransactionStatusMeta { + fn from(value: UiTransactionStatusMeta) -> Self { + let UiTransactionStatusMeta { + err, + fee, + pre_balances, + post_balances, + .. + } = value; + Self { + err, + fee, + pre_balances, + post_balances, + } + } +} + +// A serialized `TransactionInfo` is stored in the `tx` table +#[derive(Serialize, Deserialize)] +struct TransactionInfo { + slot: Slot, // The slot that contains the block with this transaction in it + index: u32, // Where the transaction is located in the block + err: Option, // None if the transaction executed successfully + memo: Option, // Transaction memo +} + +impl From for TransactionStatus { + fn from(transaction_info: TransactionInfo) -> Self { + let TransactionInfo { slot, err, .. } = transaction_info; + let status = match &err { + None => Ok(()), + Some(err) => Err(err.clone()), + }; + Self { + slot, + confirmations: None, + status, + err, + } + } +} + +// A serialized `Vec` is stored in the `tx-by-addr` table. The row keys are +// the one's compliment of the slot so that rows may be listed in reverse order +#[derive(Serialize, Deserialize)] +struct TransactionByAddrInfo { + signature: Signature, // The transaction signature + err: Option, // None if the transaction executed successfully + index: u32, // Where the transaction is located in the block + memo: Option, // Transaction memo +} + +#[derive(Clone)] +pub struct LedgerStorage { + connection: bigtable::BigTableConnection, +} + +impl LedgerStorage { + pub async fn new(read_only: bool) -> Result { + let connection = bigtable::BigTableConnection::new("solana-ledger", read_only).await?; + Ok(Self { connection }) + } + + /// Return the available slot that contains a block + pub async fn get_first_available_block(&self) -> Result> { + let mut bigtable = self.connection.client(); + let blocks = bigtable.get_row_keys("blocks", None, 1).await?; + if blocks.is_empty() { + return Ok(None); + } + Ok(key_to_slot(&blocks[0])) + } + + /// Fetch the next slots after the provided slot that contains a block + /// + /// start_slot: slot to start the search from (inclusive) + /// limit: stop after this many slots have been found. + pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result> { + let mut bigtable = self.connection.client(); + let blocks = bigtable + .get_row_keys("blocks", Some(slot_to_key(start_slot)), limit as i64) + .await?; + Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect()) + } + + /// Fetch the confirmed block from the desired slot + pub async fn get_confirmed_block( + &self, + slot: Slot, + encoding: UiTransactionEncoding, + ) -> Result { + let mut bigtable = self.connection.client(); + let block = bigtable + .get_bincode_cell::("blocks", slot_to_key(slot)) + .await?; + Ok(block.into_confirmed_block(encoding)) + } + + pub async fn get_signature_status(&self, signature: &Signature) -> Result { + let mut bigtable = self.connection.client(); + let transaction_info = bigtable + .get_bincode_cell::("tx", signature.to_string()) + .await?; + Ok(transaction_info.into()) + } + + /// Fetch a confirmed transaction + pub async fn get_confirmed_transaction( + &self, + signature: &Signature, + encoding: UiTransactionEncoding, + ) -> Result> { + let mut bigtable = self.connection.client(); + + // Figure out which block the transaction is located in + let TransactionInfo { slot, index, .. } = bigtable + .get_bincode_cell("tx", signature.to_string()) + .await?; + + // Load the block and return the transaction + let block = bigtable + .get_bincode_cell::("blocks", slot_to_key(slot)) + .await?; + match block.transactions.into_iter().nth(index as usize) { + None => { + warn!("Transaction info for {} is corrupt", signature); + Ok(None) + } + Some(bucket_block_transaction) => { + if bucket_block_transaction.transaction.signatures[0] != *signature { + warn!( + "Transaction info or confirmed block for {} is corrupt", + signature + ); + Ok(None) + } else { + Ok(Some(ConfirmedTransaction { + slot, + transaction: bucket_block_transaction + .into_transaction_with_status_meta(encoding), + })) + } + } + } + } + + /// Get confirmed signatures for the provided address, in descending ledger order + /// + /// address: address to search for + /// start_after_signature: start with the first signature older than this one + /// limit: stop after this many signatures. + pub async fn get_confirmed_signatures_for_address( + &self, + address: &Pubkey, + start_after_signature: Option<&Signature>, + limit: usize, + ) -> Result, Option)>> { + let mut bigtable = self.connection.client(); + let address_prefix = format!("{}/", address); + + // Figure out where to start listing from based on `start_after_signature` + let (first_slot, mut first_transaction_index) = match start_after_signature { + None => (Slot::MAX, 0), + Some(start_after_signature) => { + let TransactionInfo { slot, index, .. } = bigtable + .get_bincode_cell("tx", start_after_signature.to_string()) + .await?; + + (slot, index + 1) + } + }; + + let mut infos = vec![]; + + // Return the next `limit` tx-by-addr keys + let tx_by_addr_info_keys = bigtable + .get_row_keys( + "tx-by-addr", + Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))), + limit as i64, + ) + .await?; + + // Read each tx-by-addr object until `limit` signatures have been found + 'outer: for key in tx_by_addr_info_keys { + trace!("key is {}: slot is {}", key, &key[address_prefix.len()..]); + if !key.starts_with(&address_prefix) { + break 'outer; + } + + let slot = !key_to_slot(&key[address_prefix.len()..]).ok_or_else(|| { + bigtable::Error::ObjectCorrupt(format!( + "Failed to convert key to slot: tx-by-addr/{}", + key + )) + })?; + + let tx_by_addr_infos = bigtable + .get_bincode_cell::>("tx-by-addr", key) + .await?; + + for tx_by_addr_info in tx_by_addr_infos + .into_iter() + .skip(first_transaction_index as usize) + { + infos.push(( + tx_by_addr_info.signature, + slot, + tx_by_addr_info.memo, + tx_by_addr_info.err, + )); + if infos.len() >= limit { + break 'outer; + } + } + + first_transaction_index = 0; + } + Ok(infos) + } + + // Upload a new confirmed block and associated meta data. + pub async fn upload_confirmed_block( + &self, + slot: Slot, + confirmed_block: ConfirmedBlock, + ) -> Result<()> { + let mut bytes_written = 0; + + let mut by_addr: HashMap> = HashMap::new(); + + let mut tx_cells = vec![]; + for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { + let err = transaction_with_meta + .meta + .as_ref() + .and_then(|meta| meta.err.clone()); + let index = index as u32; + let transaction = transaction_with_meta + .transaction + .decode() + .expect("transaction decode failed"); + let signature = transaction.signatures[0]; + + for address in transaction.message.account_keys { + if !is_sysvar_id(&address) { + by_addr + .entry(address) + .or_default() + .push(TransactionByAddrInfo { + signature, + err: err.clone(), + index, + memo: None, // TODO + }); + } + } + + tx_cells.push(( + signature.to_string(), + TransactionInfo { + slot, + index, + err, + memo: None, // TODO + }, + )); + } + + let tx_by_addr_cells: Vec<_> = by_addr + .into_iter() + .map(|(address, transaction_info_by_addr)| { + ( + format!("{}/{}", address, slot_to_key(!slot)), + transaction_info_by_addr, + ) + }) + .collect(); + + if !tx_cells.is_empty() { + bytes_written += self + .connection + .put_bincode_cells_with_retry::("tx", &tx_cells) + .await?; + } + + if !tx_by_addr_cells.is_empty() { + bytes_written += self + .connection + .put_bincode_cells_with_retry::>( + "tx-by-addr", + &tx_by_addr_cells, + ) + .await?; + } + + let num_transactions = confirmed_block.transactions.len(); + + // Store the block itself last, after all other metadata about the block has been + // successfully stored. This avoids partial uploaded blocks from becoming visible to + // `get_confirmed_block()` and `get_confirmed_blocks()` + let blocks_cells = [(slot_to_key(slot), confirmed_block.try_into()?)]; + bytes_written += self + .connection + .put_bincode_cells_with_retry::("blocks", &blocks_cells) + .await?; + info!( + "uploaded block for slot {}: {} transactions, {} bytes", + slot, num_transactions, bytes_written + ); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_slot_to_key() { + assert_eq!(slot_to_key(0), "0000000000000000"); + assert_eq!(slot_to_key(!0), "ffffffffffffffff"); + } +}