diff --git a/cli/src/cli.rs b/cli/src/cli.rs index ff6fe59b6..8fd4f6373 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -216,6 +216,7 @@ pub enum CliCommand { TransactionHistory { address: Pubkey, before: Option, + until: Option, limit: usize, }, // Nonce commands @@ -1508,8 +1509,9 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { CliCommand::TransactionHistory { address, before, + until, limit, - } => process_transaction_history(&rpc_client, config, address, *before, *limit), + } => process_transaction_history(&rpc_client, config, address, *before, *until, *limit), // Nonce Commands diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 76e64bb4b..0bfeb478e 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -11,7 +11,7 @@ use solana_clap_utils::{ }; use solana_client::{ pubsub_client::{PubsubClient, SlotInfoMessage}, - rpc_client::RpcClient, + rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, rpc_config::{RpcLargestAccountsConfig, RpcLargestAccountsFilter}, }; use solana_remote_wallet::remote_wallet::RemoteWalletManager; @@ -444,12 +444,21 @@ pub fn parse_transaction_history( ), None => None, }; + let until = match matches.value_of("until") { + Some(signature) => Some( + signature + .parse() + .map_err(|err| CliError::BadParameter(format!("Invalid signature: {}", err)))?, + ), + None => None, + }; let limit = value_t_or_exit!(matches, "limit", usize); Ok(CliCommandInfo { command: CliCommand::TransactionHistory { address, before, + until, limit, }, signers: vec![], @@ -1311,12 +1320,16 @@ pub fn process_transaction_history( config: &CliConfig, address: &Pubkey, before: Option, + until: Option, limit: usize, ) -> ProcessResult { let results = rpc_client.get_confirmed_signatures_for_address2_with_config( address, - before, - Some(limit), + GetConfirmedSignaturesForAddress2Config { + before, + until, + limit: Some(limit), + }, )?; let transactions_found = format!("{} transactions found", results.len()); diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index b9d2cafc9..0c4f05998 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -299,18 +299,21 @@ impl RpcClient { &self, address: &Pubkey, ) -> ClientResult> { - self.get_confirmed_signatures_for_address2_with_config(address, None, None) + self.get_confirmed_signatures_for_address2_with_config( + address, + GetConfirmedSignaturesForAddress2Config::default(), + ) } pub fn get_confirmed_signatures_for_address2_with_config( &self, address: &Pubkey, - before: Option, - limit: Option, + config: GetConfirmedSignaturesForAddress2Config, ) -> ClientResult> { let config = RpcGetConfirmedSignaturesForAddress2Config { - before: before.map(|signature| signature.to_string()), - limit, + before: config.before.map(|signature| signature.to_string()), + until: config.until.map(|signature| signature.to_string()), + limit: config.limit, }; let result: Vec = self.send( @@ -1134,6 +1137,13 @@ impl RpcClient { } } +#[derive(Debug, Default)] +pub struct GetConfirmedSignaturesForAddress2Config { + pub before: Option, + pub until: Option, + pub limit: Option, +} + fn new_spinner_progress_bar() -> ProgressBar { let progress_bar = ProgressBar::new(42); progress_bar diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index 868c6c754..1c5116964 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -71,5 +71,6 @@ pub enum RpcTokenAccountsFilter { #[serde(rename_all = "camelCase")] pub struct RpcGetConfirmedSignaturesForAddress2Config { pub before: Option, // Signature as base-58 string + pub until: Option, // Signature as base-58 string pub limit: Option, } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 75234c6c3..438676669 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -873,6 +873,7 @@ impl JsonRpcRequestProcessor { &self, address: Pubkey, mut before: Option, + until: Option, mut limit: usize, ) -> Result> { if self.config.enable_rpc_transaction_history { @@ -888,6 +889,7 @@ impl JsonRpcRequestProcessor { address, highest_confirmed_root, before, + until, limit, ) .map_err(|err| Error::invalid_params(format!("{}", err)))?; @@ -903,6 +905,7 @@ impl JsonRpcRequestProcessor { bigtable_ledger_storage.get_confirmed_signatures_for_address( &address, before.as_ref(), + until.as_ref(), limit, ), ); @@ -2312,6 +2315,11 @@ impl RpcSol for RpcSolImpl { } else { None }; + let until = if let Some(until) = config.until { + Some(verify_signature(&until)?) + } else { + None + }; let limit = config .limit .unwrap_or(MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT); @@ -2323,7 +2331,7 @@ impl RpcSol for RpcSolImpl { ))); } - meta.get_confirmed_signatures_for_address2(address, before, limit) + meta.get_confirmed_signatures_for_address2(address, before, until, limit) } fn get_first_available_block(&self, meta: Self::Metadata) -> Result { diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 4ffae9888..db70f7283 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -310,13 +310,19 @@ pub async fn transaction_history( address: &Pubkey, mut limit: usize, mut before: Option, + until: Option, verbose: bool, ) -> Result<(), Box> { let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; while limit > 0 { let results = bigtable - .get_confirmed_signatures_for_address(address, before.as_ref(), limit.min(1000)) + .get_confirmed_signatures_for_address( + address, + before.as_ref(), + until.as_ref(), + limit.min(1000), + ) .await?; if results.is_empty() { @@ -480,6 +486,13 @@ impl BigTableSubCommand for App<'_, '_> { .takes_value(true) .help("Start with the first signature older than this one"), ) + .arg( + Arg::with_name("until") + .long("until") + .value_name("TRANSACTION_SIGNATURE") + .takes_value(true) + .help("End with the last signature newer than this one"), + ) .arg( Arg::with_name("verbose") .short("v") @@ -537,9 +550,12 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { let before = arg_matches .value_of("before") .map(|signature| signature.parse().expect("Invalid signature")); + let until = arg_matches + .value_of("until") + .map(|signature| signature.parse().expect("Invalid signature")); let verbose = arg_matches.is_present("verbose"); - runtime.block_on(transaction_history(&address, limit, before, verbose)) + runtime.block_on(transaction_history(&address, limit, before, until, verbose)) } _ => unreachable!(), }; diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index a605678d4..82cd9a551 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1935,6 +1935,7 @@ impl Blockstore { address: Pubkey, highest_confirmed_root: Slot, before: Option, + until: Option, limit: usize, ) -> Result> { datapoint_info!( @@ -1950,7 +1951,7 @@ impl Blockstore { // `before` signature if present. Also generate a HashSet of signatures that should // be excluded from the results. let mut get_before_slot_timer = Measure::start("get_before_slot_timer"); - let (slot, mut excluded_signatures) = match before { + let (slot, mut before_excluded_signatures) = match before { None => (highest_confirmed_root, None), Some(before) => { let transaction_status = self.get_transaction_status(before)?; @@ -2001,6 +2002,57 @@ impl Blockstore { }; get_before_slot_timer.stop(); + // Generate a HashSet of signatures that should be excluded from the results based on + // `until` signature + let mut get_until_slot_timer = Measure::start("get_until_slot_timer"); + let (lowest_slot, until_excluded_signatures) = match until { + None => (0, HashSet::new()), + Some(until) => { + let transaction_status = self.get_transaction_status(until)?; + match transaction_status { + None => (0, HashSet::new()), + Some((slot, _)) => { + let confirmed_block = self + .get_confirmed_block(slot, Some(UiTransactionEncoding::Binary)) + .map_err(|err| { + BlockstoreError::IO(IOError::new( + ErrorKind::Other, + format!("Unable to get confirmed block: {}", err), + )) + })?; + + // Load all signatures for the block + let mut slot_signatures: Vec<_> = confirmed_block + .transactions + .iter() + .filter_map(|transaction_with_meta| { + if let Some(transaction) = + transaction_with_meta.transaction.decode() + { + transaction.signatures.into_iter().next() + } else { + None + } + }) + .collect(); + + // Sort signatures as a way to entire a stable ordering within a slot, as + // the AddressSignatures column is ordered by signatures within a slot, + // not by block ordering + slot_signatures.sort(); + slot_signatures.reverse(); + + if let Some(pos) = slot_signatures.iter().position(|&x| x == until) { + slot_signatures = slot_signatures.split_off(pos); + } + + (slot, slot_signatures.into_iter().collect::>()) + } + } + } + }; + get_until_slot_timer.stop(); + // Fetch the list of signatures that affect the given address let first_available_block = self.get_first_available_block()?; let mut address_signatures = vec![]; @@ -2009,7 +2061,7 @@ impl Blockstore { let mut get_initial_slot_timer = Measure::start("get_initial_slot_timer"); let mut signatures = self.find_address_signatures(address, slot, slot)?; signatures.reverse(); - if let Some(excluded_signatures) = excluded_signatures.take() { + if let Some(excluded_signatures) = before_excluded_signatures.take() { address_signatures.extend( signatures .into_iter() @@ -2040,7 +2092,7 @@ impl Blockstore { // Iterate through starting_iterator until limit is reached while address_signatures.len() < limit { if let Some(((i, key_address, slot, signature), _)) = starting_iterator.next() { - if slot == next_max_slot { + if slot == next_max_slot || slot < lowest_slot { break; } if i == starting_primary_index @@ -2057,10 +2109,12 @@ impl Blockstore { } // Handle slots that cross primary indexes - let mut signatures = - self.find_address_signatures(address, next_max_slot, next_max_slot)?; - signatures.reverse(); - address_signatures.append(&mut signatures); + if next_max_slot >= lowest_slot { + let mut signatures = + self.find_address_signatures(address, next_max_slot, next_max_slot)?; + signatures.reverse(); + address_signatures.append(&mut signatures); + } } starting_primary_index_iter_timer.stop(); @@ -2076,6 +2130,9 @@ impl Blockstore { if slot == next_max_slot { continue; } + if slot < lowest_slot { + break; + } if i == next_primary_index && key_address == address && slot >= first_available_block @@ -2089,6 +2146,10 @@ impl Blockstore { break; } next_primary_index_iter_timer.stop(); + let mut address_signatures: Vec<(Slot, Signature)> = address_signatures + .into_iter() + .filter(|(_, signature)| !until_excluded_signatures.contains(&signature)) + .collect(); address_signatures.truncate(limit); // Fill in the status information for each found transaction @@ -2135,6 +2196,11 @@ impl Blockstore { "get_status_info_us", get_status_info_timer.as_us() as i64, i64 + ), + ( + "get_until_slot_us", + get_until_slot_timer.as_us() as i64, + i64 ) ); @@ -6377,6 +6443,7 @@ pub mod tests { address0, highest_confirmed_root, None, + None, usize::MAX, ) .unwrap(); @@ -6388,6 +6455,7 @@ pub mod tests { address1, highest_confirmed_root, None, + None, usize::MAX, ) .unwrap(); @@ -6406,23 +6474,58 @@ pub mod tests { } else { Some(all0[i - 1].signature) }, + None, 1, ) .unwrap(); assert_eq!(results.len(), 1); assert_eq!(results[0], all0[i], "Unexpected result for {}", i); } + // Fetch all signatures for address 0 individually using `until` + for i in 0..all0.len() { + let results = blockstore + .get_confirmed_signatures_for_address2( + address0, + highest_confirmed_root, + if i == 0 { + None + } else { + Some(all0[i - 1].signature) + }, + if i == all0.len() - 1 || i == all0.len() { + None + } else { + Some(all0[i + 1].signature) + }, + 10, + ) + .unwrap(); + assert_eq!(results.len(), 1); + assert_eq!(results[0], all0[i], "Unexpected result for {}", i); + } assert!(blockstore .get_confirmed_signatures_for_address2( address0, highest_confirmed_root, Some(all0[all0.len() - 1].signature), + None, 1, ) .unwrap() .is_empty()); + assert!(blockstore + .get_confirmed_signatures_for_address2( + address0, + highest_confirmed_root, + None, + Some(all0[0].signature), + 2, + ) + .unwrap() + .is_empty()); + // Fetch all signatures for address 0, three at a time assert!(all0.len() % 3 == 0); for i in (0..all0.len()).step_by(3) { @@ -6435,6 +6538,7 @@ pub mod tests { } else { Some(all0[i - 1].signature) }, + None, 3, ) .unwrap(); @@ -6456,6 +6560,7 @@ pub mod tests { } else { Some(all1[i - 1].signature) }, + None, 2, ) .unwrap(); @@ -6466,18 +6571,30 @@ pub mod tests { assert_eq!(results[1], all1[i + 1]); } - // A search for address 0 with a `before` signature from address1 should also work + // A search for address 0 with `before` and/or `until` signatures from address1 should also work let results = blockstore .get_confirmed_signatures_for_address2( address0, highest_confirmed_root, Some(all1[0].signature), + None, usize::MAX, ) .unwrap(); // The exact number of results returned is variable, based on the sort order of the // random signatures that are generated assert!(!results.is_empty()); + + let results2 = blockstore + .get_confirmed_signatures_for_address2( + address0, + highest_confirmed_root, + Some(all1[0].signature), + Some(all1[4].signature), + usize::MAX, + ) + .unwrap(); + assert!(results2.len() < results.len()); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index 99b25a45b..b988ade47 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -29,6 +29,7 @@ pub type RowKey = String; pub type CellName = String; pub type CellValue = Vec; pub type RowData = Vec<(CellName, CellValue)>; +pub type RowDataSlice<'a> = &'a [(CellName, CellValue)]; #[derive(Debug, Error)] pub enum Error { @@ -287,10 +288,14 @@ impl BigTable { /// /// If `start_at` is provided, the row key listing will start with key. /// Otherwise the listing will start from the start of the table. + /// + /// If `end_at` is provided, the row key listing will end at the key. Otherwise it will + /// continue until the `limit` is reached or the end of the table, whichever comes first. pub async fn get_row_keys( &mut self, table_name: &str, start_at: Option, + end_at: Option, rows_limit: i64, ) -> Result> { self.refresh_access_token().await; @@ -301,16 +306,13 @@ impl BigTable { rows_limit, rows: Some(RowSet { row_keys: vec![], - row_ranges: if let Some(row_key) = start_at { - vec![RowRange { - start_key: Some(row_range::StartKey::StartKeyClosed( - row_key.into_bytes(), - )), - end_key: None, - }] - } else { - vec![] - }, + row_ranges: vec![RowRange { + start_key: start_at.map(|row_key| { + row_range::StartKey::StartKeyClosed(row_key.into_bytes()) + }), + end_key: end_at + .map(|row_key| row_range::EndKey::EndKeyClosed(row_key.into_bytes())), + }], }), filter: Some(RowFilter { filter: Some(row_filter::Filter::Chain(row_filter::Chain { @@ -339,21 +341,39 @@ impl BigTable { Ok(rows.into_iter().map(|r| r.0).collect()) } - /// Get latest data from `limit` rows of `table`, starting inclusively at the `row_key` row. + /// Get latest data from `table`. /// /// All column families are accepted, and only the latest version of each column cell will be /// returned. - pub async fn get_row_data(&mut self, table_name: &str, row_key: RowKey) -> Result { + /// + /// If `start_at` is provided, the row key listing will start with key. + /// Otherwise the listing will start from the start of the table. + /// + /// If `end_at` is provided, the row key listing will end at the key. Otherwise it will + /// continue until the `limit` is reached or the end of the table, whichever comes first. + pub async fn get_row_data( + &mut self, + table_name: &str, + start_at: Option, + end_at: Option, + rows_limit: i64, + ) -> Result> { self.refresh_access_token().await; let response = self .client .read_rows(ReadRowsRequest { table_name: format!("{}{}", self.table_prefix, table_name), - rows_limit: 1, + rows_limit, rows: Some(RowSet { - row_keys: vec![row_key.into_bytes()], - row_ranges: vec![], + row_keys: vec![], + row_ranges: vec![RowRange { + start_key: start_at.map(|row_key| { + row_range::StartKey::StartKeyClosed(row_key.into_bytes()) + }), + end_key: end_at + .map(|row_key| row_range::EndKey::EndKeyClosed(row_key.into_bytes())), + }], }), filter: Some(RowFilter { // Only return the latest version of each cell @@ -364,11 +384,7 @@ impl BigTable { .await? .into_inner(); - let rows = Self::decode_read_rows_response(response).await?; - rows.into_iter() - .next() - .map(|r| r.1) - .ok_or_else(|| Error::RowNotFound) + Self::decode_read_rows_response(response).await } /// Store data for one or more `table` rows in the `family_name` Column family @@ -429,19 +445,10 @@ impl BigTable { where T: serde::de::DeserializeOwned, { - let row_data = self.get_row_data(table, key.clone()).await?; + let row_data = self.get_row_data(table, Some(key.clone()), None, 1).await?; + let (row_key, data) = &row_data[0]; - let value = row_data - .into_iter() - .find(|(name, _)| name == "bin") - .ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))? - .1; - - let data = decompress(&value)?; - bincode::deserialize(&data).map_err(|err| { - warn!("Failed to deserialize {}/{}: {}", table, key, err); - Error::ObjectCorrupt(format!("{}/{}", table, key)) - }) + deserialize_cell_data(data, table, row_key.to_string()) } pub async fn put_bincode_cells( @@ -464,3 +471,24 @@ impl BigTable { Ok(bytes_written) } } + +pub(crate) fn deserialize_cell_data( + row_data: RowDataSlice, + table: &str, + key: RowKey, +) -> Result +where + T: serde::de::DeserializeOwned, +{ + let value = &row_data + .iter() + .find(|(name, _)| name == "bin") + .ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))? + .1; + + let data = decompress(&value)?; + bincode::deserialize(&data).map_err(|err| { + warn!("Failed to deserialize {}/{}: {}", table, key, err); + Error::ObjectCorrupt(format!("{}/{}", table, key)) + }) +} diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index a3669a857..00fe352e6 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -276,7 +276,7 @@ impl LedgerStorage { /// Return the available slot that contains a block pub async fn get_first_available_block(&self) -> Result> { let mut bigtable = self.connection.client(); - let blocks = bigtable.get_row_keys("blocks", None, 1).await?; + let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?; if blocks.is_empty() { return Ok(None); } @@ -290,7 +290,7 @@ impl LedgerStorage { pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result> { let mut bigtable = self.connection.client(); let blocks = bigtable - .get_row_keys("blocks", Some(slot_to_key(start_slot)), limit as i64) + .get_row_keys("blocks", Some(slot_to_key(start_slot)), None, limit as i64) .await?; Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect()) } @@ -365,13 +365,14 @@ impl LedgerStorage { &self, address: &Pubkey, before_signature: Option<&Signature>, + until_signature: Option<&Signature>, limit: usize, ) -> Result> { let mut bigtable = self.connection.client(); let address_prefix = format!("{}/", address); // Figure out where to start listing from based on `before_signature` - let (first_slot, mut before_transaction_index) = match before_signature { + let (first_slot, before_transaction_index) = match before_signature { None => (Slot::MAX, 0), Some(before_signature) => { let TransactionInfo { slot, index, .. } = bigtable @@ -382,6 +383,18 @@ impl LedgerStorage { } }; + // Figure out where to end listing from based on `until_signature` + let (last_slot, until_transaction_index) = match until_signature { + None => (0, u32::MAX), + Some(until_signature) => { + let TransactionInfo { slot, index, .. } = bigtable + .get_bincode_cell("tx", until_signature.to_string()) + .await?; + + (slot, index) + } + }; + let mut infos = vec![]; let starting_slot_tx_by_addr_infos = bigtable @@ -391,50 +404,46 @@ impl LedgerStorage { ) .await?; - // Return the next tx-by-addr keys of amount `limit` plus extra to account for the largest + // Return the next tx-by-addr data of amount `limit` plus extra to account for the largest // number that might be flitered out - let tx_by_addr_info_keys = bigtable - .get_row_keys( + let tx_by_addr_data = bigtable + .get_row_data( "tx-by-addr", Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))), + Some(format!("{}{}", address_prefix, slot_to_key(!last_slot))), limit as i64 + starting_slot_tx_by_addr_infos.len() as i64, ) .await?; - // Read each tx-by-addr object until `limit` signatures have been found - 'outer: for key in tx_by_addr_info_keys { - trace!("key is {}: slot is {}", key, &key[address_prefix.len()..]); - if !key.starts_with(&address_prefix) { - break 'outer; - } - - let slot = !key_to_slot(&key[address_prefix.len()..]).ok_or_else(|| { + 'outer: for (row_key, data) in tx_by_addr_data { + let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| { bigtable::Error::ObjectCorrupt(format!( "Failed to convert key to slot: tx-by-addr/{}", - key + row_key )) })?; - - let tx_by_addr_infos = bigtable - .get_bincode_cell::>("tx-by-addr", key) - .await?; - - for tx_by_addr_info in tx_by_addr_infos - .into_iter() - .filter(|tx_by_addr_info| tx_by_addr_info.index < before_transaction_index) - { + let cell_data: Vec = + bigtable::deserialize_cell_data(&data, "tx-by-addr", row_key)?; + for tx_by_addr_info in cell_data.into_iter() { + // Filter out records before `before_transaction_index` + if slot == first_slot && tx_by_addr_info.index >= before_transaction_index { + continue; + } + // Filter out records after `until_transaction_index` + if slot == last_slot && tx_by_addr_info.index <= until_transaction_index { + continue; + } infos.push(ConfirmedTransactionStatusWithSignature { signature: tx_by_addr_info.signature, slot, err: tx_by_addr_info.err, memo: tx_by_addr_info.memo, }); + // Respect limit if infos.len() >= limit { break 'outer; } } - - before_transaction_index = u32::MAX; } Ok(infos) }