From de5fb3ba0ea0b3bb541f3dbfaa93b8f071bdaf99 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 10 Aug 2020 10:27:38 -0600 Subject: [PATCH] Blockstore address signatures: handle slots that cross primary indexes, and refactor get_confirmed_signatures_for_address2 (#11497) * Freeze address-signature index in the middle of slot to show failure case * Secondary filter on signature * Use AddressSignatures iterator instead of manually decrementing slots * Remove unused method * Add metrics * Add transaction-status-index doccumentation --- ledger/src/blockstore.rs | 271 +++++++++++----------- ledger/src/blockstore/blockstore_purge.rs | 5 + ledger/src/blockstore_db.rs | 4 +- 3 files changed, 142 insertions(+), 138 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e41e5242df..42c770b8e8 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1671,6 +1671,10 @@ impl Blockstore { .collect() } + /// Initializes the TransactionStatusIndex column family with two records, `0` and `1`, + /// which are used as the primary index for entries in the TransactionStatus and + /// AddressSignatures columns. At any given time, one primary index is active (ie. new records + /// are stored under this index), the other is frozen. fn initialize_transaction_status_index(&self) -> Result<()> { self.transaction_status_index_cf .put(0, &TransactionStatusIndexMeta::default())?; @@ -1687,6 +1691,8 @@ impl Blockstore { ) } + /// Toggles the active primary index between `0` and `1`, and clears the stored max-slot of the + /// frozen index in preparation for pruning. fn toggle_transaction_status_index( &self, batch: &mut WriteBatch, @@ -1902,34 +1908,10 @@ impl Blockstore { } } } - signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); + signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); Ok(signatures) } - fn get_lowest_slot_for_address(&self, address: Pubkey) -> Result> { - let mut lowest_slot = None; - for transaction_status_cf_primary_index in 0..=1 { - let mut index_iterator = self.address_signatures_cf.iter(IteratorMode::From( - ( - transaction_status_cf_primary_index, - address, - 0, - Signature::default(), - ), - IteratorDirection::Forward, - ))?; - if let Some(((i, key_address, slot, _), _)) = index_iterator.next() { - if i == transaction_status_cf_primary_index - && key_address == address - && slot < lowest_slot.unwrap_or(Slot::MAX) - { - lowest_slot = Some(slot); - } - } - } - Ok(lowest_slot) - } - pub fn get_confirmed_signatures_for_address( &self, pubkey: Pubkey, @@ -1967,7 +1949,8 @@ impl Blockstore { // Figure the `slot` to start listing signatures at, based on the ledger location of the // `before` signature if present. Also generate a HashSet of signatures that should // be excluded from the results. - let (mut slot, mut excluded_signatures) = match before { + let mut get_before_slot_timer = Measure::start("get_before_slot_timer"); + let (slot, mut excluded_signatures) = match before { None => (highest_confirmed_root, None), Some(before) => { let transaction_status = self.get_transaction_status(before)?; @@ -1999,9 +1982,10 @@ impl Blockstore { .collect(); // Sort signatures as a way to entire a stable ordering within a slot, as - // `self.find_address_signatures()` is ordered by signatures ordered and + // the AddressSignatures column is ordered by signatures within a slot, // not by block ordering slot_signatures.sort(); + slot_signatures.reverse(); if let Some(pos) = slot_signatures.iter().position(|&x| x == before) { slot_signatures.truncate(pos + 1); @@ -2015,41 +1999,96 @@ impl Blockstore { } } }; + get_before_slot_timer.stop(); // Fetch the list of signatures that affect the given address let first_available_block = self.get_first_available_block()?; - let first_address_slot = self.get_lowest_slot_for_address(address)?; - if first_address_slot.is_none() { - return Ok(vec![]); - } - let lower_bound = cmp::max(first_available_block, first_address_slot.unwrap()); let mut address_signatures = vec![]; - loop { - if address_signatures.len() >= limit { - address_signatures.truncate(limit); - break; - } - let mut signatures = self.find_address_signatures(address, slot, slot)?; - if let Some(excluded_signatures) = excluded_signatures.take() { - address_signatures.extend( - signatures - .into_iter() - .filter(|(_, signature)| !excluded_signatures.contains(&signature)), - ) - } else { - address_signatures.append(&mut signatures); - } - excluded_signatures = None; - - if slot == lower_bound { - break; - } - slot -= 1; + // Get signatures in `slot` + let mut get_initial_slot_timer = Measure::start("get_initial_slot_timer"); + let mut signatures = self.find_address_signatures(address, slot, slot)?; + signatures.reverse(); + if let Some(excluded_signatures) = excluded_signatures.take() { + address_signatures.extend( + signatures + .into_iter() + .filter(|(_, signature)| !excluded_signatures.contains(&signature)), + ) + } else { + address_signatures.append(&mut signatures); } + get_initial_slot_timer.stop(); + + // Check the active_transaction_status_index to see if it contains slot. If so, start with + // that index, as it will contain higher slots + let starting_primary_index = *self.active_transaction_status_index.read().unwrap(); + let next_primary_index = if starting_primary_index == 0 { 1 } else { 0 }; + let next_max_slot = self + .transaction_status_index_cf + .get(next_primary_index)? + .unwrap() + .max_slot; + + let mut starting_primary_index_iter_timer = Measure::start("starting_primary_index_iter"); + if slot > next_max_slot { + let mut starting_iterator = self.address_signatures_cf.iter(IteratorMode::From( + (starting_primary_index, address, slot, Signature::default()), + IteratorDirection::Reverse, + ))?; + + // Iterate through starting_iterator until limit is reached + while address_signatures.len() < limit { + if let Some(((i, key_address, slot, signature), _)) = starting_iterator.next() { + if slot == next_max_slot { + break; + } + if i == starting_primary_index + && key_address == address + && slot >= first_available_block + { + address_signatures.push((slot, signature)); + continue; + } + } + break; + } + + // Handle slots that cross primary indexes + let mut signatures = + self.find_address_signatures(address, next_max_slot, next_max_slot)?; + signatures.reverse(); + address_signatures.append(&mut signatures); + } + starting_primary_index_iter_timer.stop(); + + // Iterate through next_iterator until limit is reached + let mut next_primary_index_iter_timer = Measure::start("next_primary_index_iter_timer"); + let mut next_iterator = self.address_signatures_cf.iter(IteratorMode::From( + (next_primary_index, address, slot, Signature::default()), + IteratorDirection::Reverse, + ))?; + while address_signatures.len() < limit { + if let Some(((i, key_address, slot, signature), _)) = next_iterator.next() { + // Skip next_max_slot, which is already included + if slot == next_max_slot { + continue; + } + if i == next_primary_index + && key_address == address + && slot >= first_available_block + { + address_signatures.push((slot, signature)); + continue; + } + } + break; + } + next_primary_index_iter_timer.stop(); address_signatures.truncate(limit); // Fill in the status information for each found transaction + let mut get_status_info_timer = Measure::start("get_status_info_timer"); let mut infos = vec![]; for (slot, signature) in address_signatures.into_iter() { let transaction_status = self.get_transaction_status(signature)?; @@ -2064,6 +2103,36 @@ impl Blockstore { memo: None, }); } + get_status_info_timer.stop(); + + datapoint_info!( + "blockstore-get-conf-sigs-for-addr-2", + ( + "get_before_slot_us", + get_before_slot_timer.as_us() as i64, + i64 + ), + ( + "get_initial_slot_us", + get_initial_slot_timer.as_us() as i64, + i64 + ), + ( + "starting_primary_index_iter_us", + starting_primary_index_iter_timer.as_us() as i64, + i64 + ), + ( + "next_primary_index_iter_us", + next_primary_index_iter_timer.as_us() as i64, + i64 + ), + ( + "get_status_info_us", + get_status_info_timer.as_us() as i64, + i64 + ) + ); Ok(infos) } @@ -6242,82 +6311,6 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - fn test_get_lowest_slot_for_address() { - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let address = Pubkey::new_rand(); - let address2 = Pubkey::new_rand(); - let slot = 5; - // Add an additional to record to ensure that existent or lower slots in entries for - // other addresses do not affect return - blockstore - .address_signatures_cf - .put( - (0, address2, slot, Signature::default()), - &AddressSignatureMeta { writeable: false }, - ) - .unwrap(); - assert_eq!( - blockstore.get_lowest_slot_for_address(address).unwrap(), - None - ); - - let slot = 200; - blockstore - .address_signatures_cf - .put( - (0, address, slot, Signature::default()), - &AddressSignatureMeta { writeable: false }, - ) - .unwrap(); - assert_eq!( - blockstore.get_lowest_slot_for_address(address).unwrap(), - Some(200) - ); - - blockstore - .address_signatures_cf - .put( - (1, address, slot, Signature::default()), - &AddressSignatureMeta { writeable: false }, - ) - .unwrap(); - assert_eq!( - blockstore.get_lowest_slot_for_address(address).unwrap(), - Some(200) - ); - - let slot = 300; - blockstore - .address_signatures_cf - .put( - (1, address, slot, Signature::default()), - &AddressSignatureMeta { writeable: false }, - ) - .unwrap(); - assert_eq!( - blockstore.get_lowest_slot_for_address(address).unwrap(), - Some(200) - ); - - let slot = 100; - blockstore - .address_signatures_cf - .put( - (1, address, slot, Signature::default()), - &AddressSignatureMeta { writeable: false }, - ) - .unwrap(); - assert_eq!( - blockstore.get_lowest_slot_for_address(address).unwrap(), - Some(100) - ); - } - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - #[test] fn test_get_confirmed_signatures_for_address2() { let blockstore_path = get_tmp_ledger_path!(); @@ -6344,14 +6337,18 @@ pub mod tests { let address0 = Pubkey::new_rand(); let address1 = Pubkey::new_rand(); - for slot in 2..=4 { + for slot in 2..=7 { let entries = make_slot_entries_with_transaction_addresses(&[ address0, address1, address0, address1, ]); let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0); blockstore.insert_shreds(shreds, None, false).unwrap(); - for entry in &entries { + for (i, entry) in entries.iter().enumerate() { + if slot == 4 && i == 2 { + // Purge to freeze index 0 and write address-signatures in new primary index + blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); + } for transaction in &entry.transactions { assert_eq!(transaction.signatures.len(), 1); blockstore @@ -6366,8 +6363,8 @@ pub mod tests { } } } - blockstore.set_roots(&[1, 2, 3, 4]).unwrap(); - let highest_confirmed_root = 4; + blockstore.set_roots(&[1, 2, 3, 4, 5, 6, 7]).unwrap(); + let highest_confirmed_root = 7; // Fetch all signatures for address 0 at once... let all0 = blockstore @@ -6378,7 +6375,7 @@ pub mod tests { usize::MAX, ) .unwrap(); - assert_eq!(all0.len(), 6); + assert_eq!(all0.len(), 12); // Fetch all signatures for address 1 at once... let all1 = blockstore @@ -6389,7 +6386,7 @@ pub mod tests { usize::MAX, ) .unwrap(); - assert_eq!(all1.len(), 6); + assert_eq!(all1.len(), 12); assert!(all0 != all1); @@ -6442,7 +6439,7 @@ pub mod tests { assert_eq!(results[2], all0[i + 2]); } - // Ensure that the signatures within a slot are ordered by signature + // Ensure that the signatures within a slot are reverse ordered by signature // (current limitation of the .get_confirmed_signatures_for_address2()) for i in (0..all1.len()).step_by(2) { let results = blockstore @@ -6459,7 +6456,7 @@ pub mod tests { .unwrap(); assert_eq!(results.len(), 2); assert_eq!(results[0].slot, results[1].slot); - assert!(results[0].signature <= results[1].signature); + assert!(results[0].signature >= results[1].signature); assert_eq!(results[0], all1[i]); assert_eq!(results[1], all1[i + 1]); } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index fdffb157b6..649fc5f5f3 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -235,6 +235,9 @@ impl Blockstore { Ok(result) } + /// Purges special columns (using a non-Slot primary-index) exactly, by deserializing each slot + /// being purged and iterating through all transactions to determine the keys of individual + /// records. **This method is very slow.** fn purge_special_columns_exact( &self, batch: &mut WriteBatch, @@ -279,6 +282,8 @@ impl Blockstore { Ok(()) } + /// Purges special columns (using a non-Slot primary-index) by range. Purge occurs if frozen + /// primary index has a max-slot less than the highest slot being purged. fn purge_special_columns_with_primary_index( &self, write_batch: &mut WriteBatch, diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 93409f23d9..f95e8c95f2 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -40,7 +40,9 @@ const CODE_SHRED_CF: &str = "code_shred"; const TRANSACTION_STATUS_CF: &str = "transaction_status"; /// Column family for Address Signatures const ADDRESS_SIGNATURES_CF: &str = "address_signatures"; -/// Column family for Transaction Status Index +/// Column family for the Transaction Status Index. +/// This column family is used for tracking the active primary index for columns that for +/// query performance reasons should not be indexed by Slot. const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index"; /// Column family for Rewards const REWARDS_CF: &str = "rewards";